| /*****************************************************************************\ |
| * mvapich.c - srun support for MPICH-IB (MVAPICH 0.9.4 and 0.9.5,7,8) |
| ***************************************************************************** |
| * Copyright (C) 2004-2007 The Regents of the University of California. |
| * Copyright (C) 2008-2009 Lawrence Livermore National Security. |
| * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). |
| * CODE-OCEC-09-009. All rights reserved. |
| * |
| * This file is part of SLURM, a resource management program. |
| * For details, see <https://computing.llnl.gov/linux/slurm/>. |
| * Please also read the included file: DISCLAIMER. |
| * |
| * SLURM is free software; you can redistribute it and/or modify it under |
| * the terms of the GNU General Public License as published by the Free |
| * Software Foundation; either version 2 of the License, or (at your option) |
| * any later version. |
| * |
| * In addition, as a special exception, the copyright holders give permission |
| * to link the code of portions of this program with the OpenSSL library under |
| * certain conditions as described in each individual source file, and |
| * distribute linked combinations including the two. You must obey the GNU |
| * General Public License in all respects for all of the code used other than |
| * OpenSSL. If you modify file(s) with this exception, you may extend this |
| * exception to your version of the file(s), but you are not obligated to do |
| * so. If you do not wish to do so, delete this exception statement from your |
| * version. If you delete this exception statement from all source files in |
| * the program, then also delete it here. |
| * |
| * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY |
| * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
| * details. |
| * |
| * You should have received a copy of the GNU General Public License along |
| * with SLURM; if not, write to the Free Software Foundation, Inc., |
| * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| \*****************************************************************************/ |
| |
| #ifdef HAVE_CONFIG_H |
| # include "config.h" |
| #endif |
| |
| #ifdef WITH_PTHREADS |
| # include <pthread.h> |
| #endif |
| |
| #include <stdlib.h> |
| #include <signal.h> |
| #include <sys/types.h> |
| #include <sys/socket.h> |
| #include <netinet/in.h> |
| #include <strings.h> |
| #include <sys/poll.h> |
| #include <sys/time.h> |
| |
| #include "src/common/slurm_xlator.h" |
| #include "src/common/xmalloc.h" |
| #include "src/common/xstring.h" |
| #include "src/common/net.h" |
| #include "src/common/fd.h" |
| |
| /* |
| * 2008-07-03: |
| * |
| * This version of mvapich.c has been tested against the following |
| * protocol versions: |
| * |
| * Version 8: (pmgr_collective): mvapich-1.0.1, mvapich-1.0 |
| * Version 5: mvapich-0.9.9 r1760, mvapich-0.9.7-mlx2.2.0 |
| * Version 3: mvapich-0.9.8 |
| */ |
| |
| /* NOTE: MVAPICH has changed protocols without changing version numbers. |
| * This makes support of MVAPICH very difficult. |
| * Support for the following versions have been validated: |
| * |
| * For MVAPICH-GEN2-1.0-103, set MVAPICH_VERSION_REQUIRES_PIDS to 2 |
| * For MVAPICH 0.9.4 and 0.9.5, set MVAPICH_VERSION_REQUIRES_PIDS to 3 |
| * |
| * See functions mvapich_requires_pids() below for other mvapich versions. |
| * |
| */ |
| #define MVAPICH_VERSION_REQUIRES_PIDS 3 |
| |
| #include "mvapich.h" |
| |
| /* NOTE: AIX lacks timersub */ |
| /* Why are we using mvapich on AIX? */ |
| #ifndef timersub |
| # define timersub(a, b, result) \ |
| do { \ |
| (result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \ |
| (result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \ |
| if ((result)->tv_usec < 0) { \ |
| --(result)->tv_sec; \ |
| (result)->tv_usec += 1000000; \ |
| } \ |
| } while (0) |
| #endif |
| |
| |
| /* |
| * MVAPICH initialization data state codes |
| */ |
| enum mv_init_state |
| { |
| MV_READ_VERSION, |
| MV_READ_RANK, |
| MV_READ_HOSTIDLEN, |
| MV_READ_HOSTID, |
| MV_READ_ADDRLEN, |
| MV_READ_ADDRS, |
| MV_READ_PIDLEN, |
| MV_READ_PID, |
| MV_INIT_DONE, |
| }; |
| |
| /* |
| * Information cache for each MVAPICH process |
| */ |
| struct mvapich_info |
| { |
| int do_poll; |
| enum mv_init_state state; /* Initialization state */ |
| int nread; /* Amount of data read so far */ |
| int nwritten; /* Amount of data written */ |
| |
| int fd; /* fd for socket connection to MPI task */ |
| int version; /* Protocol version for this rank */ |
| int rank; /* This process' MPI rank */ |
| int pidlen; /* length of pid buffer */ |
| char *pid; /* This rank's local pid (V3 only) */ |
| int hostidlen; /* Host id length */ |
| int hostid; /* Separate hostid (for protocol v5) */ |
| int addrlen; /* Length of addr array in bytes */ |
| |
| int *addr; /* This process' address array, which for |
| * process rank N in an M process job |
| * looks like: |
| * |
| * qp0,qp1,..,lid,qpN+1,..,qpM-1, hostid |
| * |
| * Where position N is this rank's lid, |
| * and the hostid is tacked onto the end |
| * of the array (for protocol version 3) |
| */ |
| }; |
| |
| /* Globals for the mvapich thread. |
| */ |
| int mvapich_verbose = 0; |
| |
| static time_t first_abort_time = 0; |
| |
| /* Per-job step state information. The MPI plugin may be called |
| * multiple times from the SLURM API's slurm_step_launch() in the |
| * same process. |
| */ |
| struct mvapich_state { |
| pthread_t tid; |
| struct mvapich_info **mvarray; |
| int fd; |
| int nprocs; |
| int nconnected; |
| int protocol_version; |
| int protocol_phase; |
| int connect_once; |
| int do_timing; |
| |
| int timeout; /* Initialization timeout in seconds */ |
| int start_time; /* Time from which to measure timeout */ |
| |
| int shutdown_pipe[2]; /* Write to this pipe to interrupt poll calls */ |
| bool shutdown_complete; /* Set true when mpi thr about to exit */ |
| int shutdown_timeout; /* Num secs for main thread to wait for |
| mpi thread to finish */ |
| |
| pthread_mutex_t shutdown_lock; |
| pthread_cond_t shutdown_cond; |
| |
| mpi_plugin_client_info_t job[1]; |
| }; |
| |
| /* |
| * MVAPICH poll structure used by mvapich_poll_next, etc. |
| */ |
| struct mvapich_poll |
| { |
| mvapich_state_t *st; |
| struct mvapich_info **mvmap; |
| struct pollfd *fds; |
| int counter; |
| int nfds; |
| }; |
| |
| |
| /* |
| * mvapich debugging defines |
| */ |
| #define mvapich_debug(args...) \ |
| do { \ |
| if (mvapich_verbose) \ |
| info ("mvapich: " args); \ |
| } while (0); |
| |
| #define mvapich_debug2(args...) \ |
| do { \ |
| if (mvapich_verbose > 1) \ |
| info ("mvapich: " args); \ |
| } while (0); |
| |
| #define mvapich_debug3(args...) \ |
| do { \ |
| if (mvapich_verbose > 2) \ |
| info ("mvapich: " args); \ |
| } while (0); |
| |
| |
| static void do_timings (mvapich_state_t *st, const char *fmt, ...) |
| __attribute__ ((format (printf, 2, 3))); |
| void mvapich_thr_exit(mvapich_state_t *st); |
| |
| static int mvapich_requires_pids (mvapich_state_t *st) |
| { |
| if ( st->protocol_version == MVAPICH_VERSION_REQUIRES_PIDS |
| || st->protocol_version == 5 |
| || st->protocol_version == 6 ) |
| return (1); |
| return (0); |
| } |
| |
| /* |
| * Return the number of ms left until the MVAPICH startup |
| * timeout expires. |
| */ |
| static int startup_timeout (mvapich_state_t *st) |
| { |
| time_t now; |
| time_t remaining; |
| |
| if (st->timeout <= 0) |
| return (-1); |
| |
| now = time (NULL); |
| |
| if (!st->start_time) |
| return (-1); |
| |
| remaining = st->timeout - (now - st->start_time); |
| |
| if (remaining >= 0) |
| return ((int) remaining * 1000); |
| else |
| return (0); |
| } |
| |
| char * vmsg (const char *msg, va_list ap) |
| { |
| int n = -1; |
| int size = BUFSIZ; |
| va_list vp; |
| char *p = xmalloc (size); |
| |
| while (1) { |
| va_copy (vp, ap); |
| n = vsnprintf (p, size, msg, vp); |
| va_end (vp); |
| |
| if (n > -1 && n < size) |
| return (p); |
| |
| if (n > -1) |
| size = n + 1; |
| else if (n == -1) |
| size *= 2; |
| |
| p = xrealloc (p, size); |
| } |
| |
| return (p); |
| } |
| |
| |
| /* |
| * Forcibly kill job (with optional error message). |
| */ |
| static void mvapich_terminate_job (mvapich_state_t *st, const char *msg, ...) |
| { |
| if (msg) { |
| va_list ap; |
| va_start (ap, msg); |
| char *p = vmsg (msg, ap); |
| error ("mvapich: %s", p); |
| xfree (p); |
| } |
| |
| slurm_kill_job_step (st->job->jobid, st->job->stepid, SIGKILL); |
| kill(getpid(), SIGTERM); /* Needed for better srun cleanup */ |
| pthread_exit(NULL); |
| /* NORETURN */ |
| } |
| |
| static struct mvapich_info *mvapich_info_find (mvapich_state_t *st, int rank) |
| { |
| int i; |
| |
| for (i = 0; i < st->nprocs; i++) { |
| if (st->mvarray[i] && st->mvarray[i]->rank == rank) |
| return (st->mvarray[i]); |
| } |
| return (NULL); |
| } |
| |
| /* |
| * Issue a report of tasks/hosts that we may be waiting for. |
| * by checking either mvi->fd < 0 || mvi->do_poll == 1. |
| */ |
| static void report_absent_tasks (mvapich_state_t *st, int check_do_poll) |
| { |
| int i; |
| char buf[16]; |
| hostlist_t tasks = hostlist_create (NULL); |
| hostlist_t hosts = hostlist_create (NULL); |
| slurm_step_layout_t *sl = st->job->step_layout; |
| |
| for (i = 0; i < st->nprocs; i++) { |
| struct mvapich_info *m = mvapich_info_find (st ,i); |
| |
| if ((m == NULL) || (m->fd < 0) || (check_do_poll && m->do_poll)) { |
| const char *host = slurm_step_layout_host_name (sl, i); |
| sprintf (buf, "%d", i); |
| hostlist_push (tasks, buf); |
| hostlist_push (hosts, host); |
| } |
| } |
| |
| if (hostlist_count (tasks)) { |
| char r [4096]; |
| char h [4096]; |
| hostlist_uniq (hosts); |
| int nranks = hostlist_count (tasks); |
| int nhosts = hostlist_count (hosts); |
| hostlist_ranged_string (tasks, sizeof(r), r); |
| hostlist_ranged_string (hosts, sizeof(h), h); |
| error ("mvapich: timeout: waiting on rank%s %s on host%s %s.", |
| nranks > 1 ? "s" : "", r, |
| nhosts > 1 ? "s" : "", h); |
| } |
| |
| hostlist_destroy (hosts); |
| hostlist_destroy (tasks); |
| } |
| |
| |
| static struct mvapich_info * mvapich_info_create (void) |
| { |
| struct mvapich_info *mvi = xmalloc (sizeof (*mvi)); |
| memset (mvi, 0, sizeof (*mvi)); |
| mvi->fd = -1; |
| mvi->rank = -1; |
| mvi->state = MV_READ_VERSION; |
| mvi->nread = 0; |
| |
| return (mvi); |
| } |
| |
| static void mvapich_info_destroy (struct mvapich_info *mvi) |
| { |
| xfree (mvi->addr); |
| xfree (mvi->pid); |
| xfree (mvi); |
| return; |
| } |
| |
| /* |
| * Reset an mvapich_poll object so it may be used again. |
| */ |
| static void mvapich_poll_reset (struct mvapich_poll *mp) |
| { |
| int i; |
| mp->counter = 0; |
| mp->nfds = 0; |
| |
| /* |
| * Reset mvapich_info do_poll attribute. |
| */ |
| for (i = 0; i < mp->st->nprocs; i++) |
| mp->st->mvarray[i]->do_poll = 1; |
| return; |
| } |
| |
| |
| /* |
| * Create an mvapich_poll object, used to poll all mvapich |
| * file descriptors for read/write activity |
| * |
| * Resets do_poll for all mvapich_info objects in mvarray to 1. |
| * (Thus, only one mvapich_poll should be in use at a time) |
| */ |
| static struct mvapich_poll * mvapich_poll_create (mvapich_state_t *st) |
| { |
| struct mvapich_poll *mp = xmalloc (sizeof (*mp)); |
| |
| mp->mvmap = xmalloc (st->nprocs * sizeof (struct mvapich_info *)); |
| mp->fds = xmalloc (st->nprocs * sizeof (struct pollfd)); |
| mp->st = st; |
| |
| mvapich_poll_reset (mp); |
| |
| return (mp); |
| } |
| |
| static void mvapich_poll_destroy (struct mvapich_poll *mp) |
| { |
| xfree (mp->mvmap); |
| xfree (mp->fds); |
| xfree (mp); |
| } |
| |
| |
| /* |
| * Call poll(2) on mvapich_poll object, handling EAGAIN and EINTR errors. |
| */ |
| static int mvapich_poll_internal (struct mvapich_poll *mp) |
| { |
| int n; |
| while ((n = poll (mp->fds, mp->nfds, startup_timeout (mp->st))) < 0) { |
| if (errno != EINTR && errno != EAGAIN) |
| return (-1); |
| } |
| return (n); |
| } |
| |
| /* |
| * Poll for next available mvapich_info object with read/write activity |
| * |
| * Returns NULL when no more mvapich fds need to be polled. |
| * |
| * The caller is responsible for updating mvi->do_poll to indicate |
| * when a mvapich_info object's file descriptor no longer needs |
| * to be polled for activity. |
| * |
| */ |
| static struct mvapich_info * |
| mvapich_poll_next (struct mvapich_poll *mp, int do_read) |
| { |
| int i, rc; |
| int event = do_read ? POLLIN : POLLOUT; |
| mvapich_state_t *st = mp->st; |
| |
| again: |
| /* |
| * If the loop counter is 0, then we need to reset data structures |
| * and poll again. |
| */ |
| if (mp->counter == 0) { |
| int j = 0; |
| |
| memset (mp->fds, 0, sizeof (st->nprocs * sizeof (struct pollfd))); |
| memset (mp->mvmap, 0, sizeof (st->nprocs * sizeof (*mp->mvmap))); |
| mp->nfds = 0; |
| |
| for (i = 0; i < st->nprocs; i++) { |
| struct mvapich_info *mvi = mp->st->mvarray [i]; |
| if (mvi->do_poll) { |
| mp->mvmap[j] = mvi; |
| mp->fds[j].fd = mvi->fd; |
| mp->fds[j].events = event; |
| j++; |
| mp->nfds++; |
| } |
| } |
| |
| /* |
| * If there are no more file descriptors to poll, then |
| * return NULL to indicate we're done. |
| */ |
| if (mp->nfds == 0) |
| return (NULL); |
| |
| mvapich_debug3 ("mvapich_poll_next (nfds=%d, timeout=%d)", |
| mp->nfds, startup_timeout (st)); |
| if ((rc = mvapich_poll_internal (mp)) < 0) |
| mvapich_terminate_job (st, "mvapich_poll_next: %m"); |
| else if (rc == 0) { |
| /* |
| * If we timed out, then report all tasks that we were |
| * still waiting for. |
| */ |
| report_absent_tasks (st, 1); |
| mvapich_terminate_job (st, NULL); |
| } |
| } |
| |
| /* |
| * Loop through poll fds and return first mvapich_info object |
| * we find that has the requested read/write activity. |
| * When found, we update the loop counter, and return |
| * the corresponding mvapich_info object. |
| * |
| */ |
| for (i = mp->counter; i < mp->nfds; i++) { |
| if (mp->fds[i].revents == event) { |
| mp->counter = i+1; |
| return (mp->mvmap[i]); |
| } |
| } |
| |
| mp->counter = 0; |
| goto again; |
| |
| return (NULL); |
| } |
| |
| |
| static int mvapich_poll (mvapich_state_t *st, struct mvapich_info *mvi, |
| int write) { |
| int rc = 0; |
| struct pollfd pfds[1]; |
| int timeout; |
| |
| pfds->fd = mvi->fd; |
| pfds->events = write ? POLLOUT : POLLIN; |
| |
| timeout = startup_timeout (st); |
| while (timeout && (rc = poll (pfds, 1, startup_timeout (st))) < 0) { |
| if (errno != EINTR) |
| return (-1); |
| } |
| |
| /* |
| * If poll() timed out, forcibly kill job and exit instead of |
| * waiting longer for remote IO, process exit, etc. |
| */ |
| if (rc == 0) { |
| if (mvi->rank >= 0) { |
| slurm_step_layout_t *sl = st->job->step_layout; |
| const char *host = slurm_step_layout_host_name (sl, mvi->rank); |
| error("Timeout waiting to read from MPI rank %d [on %s]. Exiting.", |
| mvi->rank, host); |
| } |
| else { |
| report_absent_tasks (st, 0); |
| } |
| |
| mvapich_terminate_job (st, NULL); |
| /* NORETURN */ |
| } |
| |
| return (rc); |
| } |
| |
| static int mvapich_write (struct mvapich_info *mvi, void * buf, size_t len) |
| { |
| size_t nleft; |
| ssize_t n; |
| unsigned char *p; |
| |
| p = buf + mvi->nwritten; |
| nleft = len - mvi->nwritten; |
| |
| n = write (mvi->fd, p, nleft); |
| |
| if ((n < 0) && (errno != EAGAIN)) { |
| error ("mvapich: rank %d: write (%zd/%zd): %m", |
| mvi->rank, nleft, len); |
| return (-1); |
| } |
| |
| if (n > 0) |
| mvi->nwritten += n; |
| |
| if (mvi->nwritten == len) { |
| mvi->nwritten = 0; |
| mvi->do_poll = 0; |
| } |
| |
| return (0); |
| } |
| |
| static int mvapich_read (struct mvapich_info *mvi, void * buf, size_t len) |
| { |
| size_t nleft; |
| ssize_t n; |
| unsigned char *p; |
| |
| p = buf + mvi->nread; |
| nleft = len - mvi->nread; |
| |
| n = read (mvi->fd, p, nleft); |
| |
| if ((n < 0) && (errno != EAGAIN)) { |
| error ("mvapich: rank %d: read (%zd/%zd): %m", |
| mvi->rank, nleft, len); |
| return (-1); |
| } |
| |
| if (n > 0) |
| mvi->nread += n; |
| |
| if (mvi->nread == len) { |
| mvi->nread = 0; |
| mvi->do_poll = 0; |
| } |
| |
| return (0); |
| } |
| |
| static int mvapich_write_n (mvapich_state_t *st, struct mvapich_info *mvi, |
| void *buf, size_t len) |
| { |
| int nleft = len; |
| int n = 0; |
| unsigned char * p = buf; |
| |
| while (nleft > 0) { |
| /* Poll for write-activity */ |
| if (mvapich_poll (st, mvi, 1) < 0) |
| return (-1); |
| |
| if ((n = write (mvi->fd, p, nleft)) < 0) { |
| if (errno == EAGAIN || errno == EINTR) |
| continue; |
| return (-1); |
| } |
| |
| nleft -= n; |
| p += n; |
| } |
| |
| return (len - nleft); |
| } |
| |
| static int mvapich_read_n (mvapich_state_t *st, struct mvapich_info *mvi, |
| void *buf, size_t len) |
| { |
| int nleft = len; |
| int n = 0; |
| unsigned char * p = buf; |
| |
| while (nleft > 0) { |
| /* Poll for read-activity */ |
| if (mvapich_poll (st, mvi, 0) < 0) |
| return (-1); |
| |
| if ((n = read (mvi->fd, p, nleft)) < 0) { |
| if (errno == EAGAIN || errno == EINTR) |
| continue; |
| return (-1); |
| } |
| |
| if (n == 0) { /* unexpected EOF */ |
| error ("mvapich: rank %d: " |
| "Unexpected EOF (%dB left to read)", |
| mvi->rank, nleft); |
| return (-1); |
| } |
| |
| nleft -= n; |
| p += n; |
| } |
| |
| return (len - nleft); |
| } |
| |
| |
| /* |
| * Return non-zero if protocol version has two phases. |
| */ |
| static int mvapich_dual_phase (mvapich_state_t *st) |
| { |
| return (st->protocol_version == 5 || st->protocol_version == 6); |
| } |
| |
| static int mvapich_abort_sends_rank (mvapich_state_t *st) |
| { |
| if (st->protocol_version >= 3) |
| return (1); |
| return (0); |
| } |
| |
| /* |
| * Broadcast addr information to all connected mvapich processes. |
| * The format of the information sent back to each process is: |
| * |
| * for rank N in M process job: |
| * |
| * lid info : lid0,lid1,...lidM-1 |
| * qp info : qp0, qp1, ..., -1, qpN+1, ...,qpM-1 |
| * hostids : hostid0,hostid1,...,hostidM-1 |
| * |
| * total of 3*nprocs ints. |
| * |
| */ |
| static void mvapich_bcast_addrs (mvapich_state_t *st) |
| { |
| struct mvapich_info *m; |
| int out_addrs_len = 3 * st->nprocs * sizeof (int); |
| int *out_addrs = xmalloc (out_addrs_len); |
| int i = 0; |
| int j = 0; |
| |
| mvapich_debug2 ("Bcasting addrs to %d tasks", st->nprocs); |
| |
| for (i = 0; i < st->nprocs; i++) { |
| m = st->mvarray[i]; |
| /* |
| * lids are found in addrs[rank] for each process |
| */ |
| out_addrs[i] = m->addr[m->rank]; |
| |
| /* |
| * hostids are the last entry in addrs |
| */ |
| out_addrs[2 * st->nprocs + i] = |
| m->addr[(m->addrlen/sizeof (int)) - 1]; |
| } |
| |
| for (i = 0; i < st->nprocs; i++) { |
| m = st->mvarray[i]; |
| |
| /* |
| * qp array is tailored to each process. |
| */ |
| for (j = 0; j < st->nprocs; j++) |
| out_addrs[st->nprocs + j] = |
| (i == j) ? -1 : st->mvarray[j]->addr[i]; |
| |
| mvapich_debug3 ("writing addrs to task %d", i); |
| mvapich_write_n (st, m, out_addrs, out_addrs_len); |
| if (mvapich_verbose > 2) |
| do_timings (st, "Write addrs to task %d", i); |
| |
| /* |
| * Protocol version 3 requires pid list to be sent next |
| */ |
| if (mvapich_requires_pids (st)) { |
| for (j = 0; j < st->nprocs; j++) |
| mvapich_write_n (st, m, |
| &st->mvarray[j]->pid, st->mvarray[j]->pidlen); |
| } |
| |
| } |
| |
| xfree (out_addrs); |
| return; |
| } |
| |
| static void mvapich_bcast_hostids (mvapich_state_t *st) |
| { |
| struct mvapich_poll *mp; |
| struct mvapich_info *mvi; |
| int * hostids; |
| int i = 0; |
| size_t len = st->nprocs * sizeof (int); |
| |
| hostids = xmalloc (len); |
| |
| for (i = 0; i < st->nprocs; i++) |
| hostids [i] = st->mvarray[i]->hostid; |
| |
| /* |
| * Broadcast hostids |
| */ |
| mvapich_debug ("bcasting hostids"); |
| mp = mvapich_poll_create (st); |
| while ((mvi = mvapich_poll_next (mp, 0))) { |
| if (mvapich_write (mvi, hostids, len) < 0) |
| mvapich_terminate_job (st, "write hostid rank %d: %m", mvi->rank); |
| } |
| xfree (hostids); |
| |
| /* |
| * Read connect_once value from every rank |
| * Each rank will either close the connection (connect_once = 0) |
| * or send the connect_once value (presumed 1). |
| */ |
| mvapich_debug ("reading connect once value"); |
| mvapich_poll_reset (mp); |
| while ((mvi = mvapich_poll_next (mp, 1))) { |
| int co = 1, rc; |
| mvapich_debug3 ("reading connect once value from rank %d fd=%d", |
| mvi->rank, mvi->fd); |
| if ((rc = read (mvi->fd, &co, sizeof (int))) <= 0) { |
| mvapich_debug2 ("reading connect once value rc=%d: %m", |
| rc); |
| close (mvi->fd); |
| mvi->fd = -1; |
| st->connect_once = 0; |
| } |
| mvi->do_poll = 0; |
| } |
| |
| mvapich_poll_destroy (mp); |
| return; |
| } |
| |
| /* Write size bytes from buf into socket for rank */ |
| static int mvapich_send (mvapich_state_t *st, void* buf, int size, int rank) |
| { |
| struct mvapich_info *mvi = st->mvarray [rank]; |
| return (mvapich_write_n (st, mvi, buf, size)); |
| } |
| |
| /* Read size bytes from socket for rank into buf */ |
| static int mvapich_recv (mvapich_state_t *st, void* buf, int size, int rank) |
| { |
| struct mvapich_info *mvi = st->mvarray [rank]; |
| return (mvapich_read_n (st, mvi, buf, size)); |
| } |
| |
| /* Scatter data in buf to ranks using chunks of size bytes */ |
| static int mvapich_scatterbcast (mvapich_state_t *st, void* buf, int size) |
| { |
| int rc = 0; |
| int n = 0; |
| struct mvapich_poll *mp; |
| struct mvapich_info *mvi; |
| |
| mp = mvapich_poll_create (st); |
| while ((mvi = mvapich_poll_next (mp, 0))) { |
| if ((rc = mvapich_write (mvi, buf + (mvi->rank * size), size)) < 0) |
| break; |
| n += rc; |
| } |
| mvapich_poll_destroy (mp); |
| |
| return (rc < 0 ? rc : n); |
| } |
| |
| /* Broadcast buf to each rank, which is size bytes big */ |
| static int mvapich_allgatherbcast (mvapich_state_t *st, void* buf, int size) |
| { |
| int rc = 0; |
| int n = 0; |
| struct mvapich_poll *mp; |
| struct mvapich_info *mvi; |
| |
| mp = mvapich_poll_create (st); |
| while ((mvi = mvapich_poll_next (mp, 0))) { |
| if ((rc = mvapich_write (mvi, buf, size)) < 0) |
| break; |
| n += rc; |
| } |
| mvapich_poll_destroy (mp); |
| |
| return (rc < 0 ? rc : n); |
| } |
| |
| /* Perform alltoall using data in buf with elements of size bytes */ |
| static int mvapich_alltoallbcast (mvapich_state_t *st, void* buf, int size) |
| { |
| int pbufsize = size * st->nprocs; |
| void* pbuf = xmalloc(pbufsize); |
| int i, src, rc = 0; |
| int n = 0; |
| |
| for (i = 0; i < st->nprocs; i++) { |
| for (src = 0; src < st->nprocs; src++) { |
| memcpy( pbuf + size*src, |
| buf + size*(src*st->nprocs + i), |
| size |
| ); |
| } |
| if ((rc = mvapich_send (st, pbuf, pbufsize, i)) <= 0) |
| goto out; |
| n += rc; |
| } |
| |
| out: |
| xfree(pbuf); |
| return (rc < 0 ? rc : n); |
| } |
| |
| static int recv_common_value (mvapich_state_t *st, int *valp, int rank) |
| { |
| int val; |
| if (mvapich_recv (st, &val, sizeof (int), rank) <= 0) { |
| error ("mvapich: recv_common_value: rank %d: %m", rank); |
| return (-1); |
| } |
| mvapich_debug3 ("recv_common_value (rank=%d, val=%d)", rank, *valp); |
| |
| /* |
| * If value is uninitialized, set it to current value, |
| * otherwise ensure that current value matches previous |
| */ |
| if (*valp == -1) |
| *valp = val; |
| else if (val != *valp) { |
| error ("mvapich: PMGR: unexpected value from rank %d: " |
| "expected %d, recvd %d", rank, *valp, val); |
| return (-1); |
| } |
| return (0); |
| } |
| |
| /* |
| * PMGR_BCAST (root, size of message, then message data (from root only)) |
| */ |
| static int process_pmgr_bcast (mvapich_state_t *st, int *rootp, int *sizep, |
| void ** bufp, int rank) |
| { |
| if (recv_common_value (st, rootp, rank) < 0) |
| return (-1); |
| if (recv_common_value (st, sizep, rank) < 0) |
| return (-1); |
| if (rank != *rootp) |
| return (0); |
| |
| /* |
| * Recv data from root |
| */ |
| *bufp = xmalloc (*sizep); |
| mvapich_debug3 ("PMGR_BCAST: recv from root"); |
| if (mvapich_recv (st, *bufp, *sizep, rank) < 0) { |
| error ("mvapich: PMGR_BCAST: Failed to recv from root: %m"); |
| return (-1); |
| } |
| return (0); |
| } |
| |
| /* |
| * PMGR_GATHER (root, size of message, then message data) |
| */ |
| static int process_pmgr_gather (mvapich_state_t *st, int *rootp, |
| int *sizep, void **bufp, int rank) |
| { |
| if (recv_common_value (st, rootp, rank) < 0) |
| return (-1); |
| if (recv_common_value (st, sizep, rank) < 0) |
| return (-1); |
| if (*bufp == NULL) |
| *bufp = xmalloc (*sizep * st->nprocs); |
| |
| mvapich_debug3 ("PMGR_GATHER: recv from rank %d", rank); |
| if (mvapich_recv(st, (*bufp) + (*sizep)*rank, *sizep, rank) < 0) { |
| error ("mvapich: PMGR_/GATHER: rank %d: recv: %m", rank); |
| return (-1); |
| } |
| return (0); |
| } |
| |
| /* |
| * PMGR_SCATTER (root, size of message, then message data) |
| */ |
| static int process_pmgr_scatter (mvapich_state_t *st, int *rootp, |
| int *sizep, void **bufp, int rank) |
| { |
| if (recv_common_value (st, rootp, rank) < 0) |
| return (-1); |
| if (recv_common_value (st, sizep, rank) < 0) |
| return (-1); |
| if (rank != *rootp) |
| return (0); |
| |
| if (*bufp == NULL) |
| *bufp = xmalloc (*sizep * st->nprocs); |
| mvapich_debug3 ("PMGR_SCATTER: recv from rank %d", rank); |
| if (mvapich_recv(st, *bufp, (*sizep) * st->nprocs, rank) < 0) { |
| error ("mvapich: PMGR_SCATTER: rank %d: recv: %m", rank); |
| return (-1); |
| } |
| return (0); |
| } |
| |
| /* |
| * PMGR_ALLGATHER (size of message, then message data) |
| */ |
| static int process_pmgr_allgather (mvapich_state_t *st, int *sizep, |
| void **bufp, int rank) |
| { |
| if (recv_common_value (st, sizep, rank) < 0) |
| return (-1); |
| if (*bufp == NULL) |
| *bufp = xmalloc (*sizep * st->nprocs); |
| |
| mvapich_debug3 ("PMGR_ALLGATHER: recv from rank %d", rank); |
| if (mvapich_recv (st, (*bufp) + *sizep*rank, *sizep, rank) < 0) { |
| error ("mvapich: PMGR_ALLGATHER: rank %d: %m", rank); |
| return (-1); |
| } |
| return (0); |
| } |
| |
| /* |
| * PMGR_ALLTOALL (size of message, then message data) |
| */ |
| static int process_pmgr_alltoall (mvapich_state_t *st, int *sizep, |
| void **bufp, int rank) |
| { |
| if (recv_common_value (st, sizep, rank) < 0) |
| return (-1); |
| |
| if (*bufp == NULL) |
| *bufp = xmalloc (*sizep * st->nprocs * st->nprocs); |
| mvapich_debug3 ("PMGR_ALLTOALL: recv from rank %d", rank); |
| if (mvapich_recv ( st, |
| *bufp + (*sizep * st->nprocs)*rank, |
| *sizep * st->nprocs, rank ) < 0) { |
| error ("mvapich: PMGR_ALLTOALL: recv: rank %d: %m", rank); |
| return (-1); |
| } |
| |
| return (0); |
| } |
| |
| |
| static int mvapich_process_op (mvapich_state_t *st, |
| struct mvapich_info *mvi, int *rootp, int *opcodep, |
| void **bufp, int *sizep) |
| { |
| int rank, code, opcode = -1; |
| int exit = 0; |
| |
| // read in opcode |
| if (recv_common_value (st, opcodep, mvi->rank) < 0) { |
| error ("mvapich: rank %d: Failed to read opcode: %m", |
| mvi->rank); |
| return (-1); |
| } |
| |
| opcode = *opcodep; |
| mvapich_debug3 ("rank %d: opcode=%d", mvi->rank, opcode); |
| |
| // read in additional data depending on current opcode |
| |
| switch (*opcodep) { |
| case 0: // PMGR_OPEN (followed by rank) |
| if (mvapich_recv (st, &rank, sizeof (int), mvi->rank) <= 0) { |
| error ("mvapich: PMGR_OPEN: recv: %m"); |
| exit = 1; |
| } |
| break; |
| case 1: // PMGR_CLOSE (no data, close the socket) |
| close(mvi->fd); |
| break; |
| case 2: // PMGR_ABORT (followed by exit code) |
| if (mvapich_recv (st, &code, sizeof (int), mvi->rank) <= 0) { |
| error ("mvapich: PMGR_ABORT: recv: %m"); |
| } |
| error("mvapich abort with code %d from rank %d", code, mvi->rank); |
| break; |
| case 3: // PMGR_BARRIER (no data) |
| break; |
| case 4: // PMGR_BCAST |
| if (process_pmgr_bcast (st, rootp, sizep, bufp, mvi->rank) < 0) |
| return (-1); |
| break; |
| case 5: // PMGR_GATHER |
| if (process_pmgr_gather (st, rootp, sizep, bufp, mvi->rank) < 0) |
| return (-1); |
| break; |
| case 6: // PMGR_SCATTER |
| if (process_pmgr_scatter (st, rootp, sizep, bufp, mvi->rank) < 0) |
| return (-1); |
| break; |
| case 7: // PMGR_ALLGATHER |
| if (process_pmgr_allgather (st, sizep, bufp, mvi->rank) < 0) |
| return (-1); |
| break; |
| case 8: // PMGR_ALLTOALL |
| if (process_pmgr_alltoall (st, sizep, bufp, mvi->rank) < 0) |
| return (-1); |
| break; |
| default: |
| error("Unrecognized PMGR opcode: %d", opcode); |
| return (-1); |
| } |
| |
| return (exit); |
| } |
| |
| static int mvapich_complete_op (mvapich_state_t *st, int opcode, int root, |
| void *buf, int size) |
| { |
| int rc = 0; |
| |
| switch(opcode) { |
| case 0: // PMGR_OPEN |
| mvapich_debug ("Completed PMGR_OPEN"); |
| break; |
| case 1: // PMGR_CLOSE |
| mvapich_debug ("Completed PMGR_CLOSE"); |
| rc = 1; |
| break; |
| case 2: // PMGR_ABORT |
| mvapich_debug ("Completed PMGR_ABORT"); |
| rc = 1; |
| break; |
| case 3: // PMGR_BARRIER (just echo the opcode back) |
| mvapich_debug ("Completing PMGR_BARRIER"); |
| mvapich_allgatherbcast (st, &opcode, sizeof(opcode)); |
| mvapich_debug ("Completed PMGR_BARRIER"); |
| break; |
| case 4: // PMGR_BCAST |
| mvapich_debug ("Completing PMGR_BCAST"); |
| mvapich_allgatherbcast (st, buf, size); |
| mvapich_debug ("Completed PMGR_BCAST"); |
| break; |
| case 5: // PMGR_GATHER |
| mvapich_debug ("Completing PMGR_GATHER"); |
| mvapich_send (st, buf, size * st->nprocs, root); |
| mvapich_debug ("Completed PMGR_GATHER"); |
| break; |
| case 6: // PMGR_SCATTER |
| mvapich_debug ("Completing PMGR_SCATTER"); |
| mvapich_scatterbcast (st, buf, size); |
| mvapich_debug ("Completed PMGR_SCATTER"); |
| break; |
| case 7: // PMGR_ALLGATHER |
| mvapich_debug ("Completing PMGR_ALLGATHER"); |
| mvapich_allgatherbcast (st, buf, size * st->nprocs); |
| mvapich_debug ("Completed PMGR_ALLGATHER"); |
| break; |
| case 8: // PMGR_ALLTOALL |
| mvapich_debug ("Completing PMGR_ALLTOALL"); |
| mvapich_alltoallbcast (st, buf, size); |
| mvapich_debug ("Completed PMGR_ALLTOALL"); |
| break; |
| default: |
| error("Unrecognized PMGR opcode: %d", opcode); |
| } |
| |
| return (rc); |
| } |
| |
| static int mvapich_pmgr_loop (mvapich_state_t *st) |
| { |
| int opcode = -1; |
| int root = -1; |
| int size = -1; |
| int done = 0; |
| void * buf = NULL; |
| |
| int completed = 0; |
| struct mvapich_info *mvi; |
| struct mvapich_poll *mp; |
| |
| mvapich_debug ("Processing PMGR opcodes"); |
| |
| // for each process, read in one opcode and its associated data |
| mp = mvapich_poll_create (st); |
| while ((mvi = mvapich_poll_next (mp, 1))) { |
| done = mvapich_process_op (st, mvi, &root, &opcode, &buf, &size); |
| completed++; |
| mvi->do_poll = 0; |
| } |
| mvapich_poll_destroy (mp); |
| |
| // Complete any operations |
| done = mvapich_complete_op (st, opcode, root, buf, size); |
| |
| return (done); |
| } |
| |
| |
| /* |
| * This function carries out pmgr_collective operations to |
| * bootstrap MPI. These collective operations are modeled after |
| * MPI collectives -- all tasks must call them in the same order |
| * and with consistent parameters. |
| * |
| * Until a 'CLOSE' or 'ABORT' message is seen, we continuously loop |
| * processing ops |
| * For each op, we read one packet from each rank (socket) |
| * A packet consists of an integer OP CODE, followed by variable |
| * length data depending on the operation |
| * After reading a packet from each rank, srun completes the |
| * operation by broadcasting data back to any destinations, |
| * depending on the operation being performed |
| * |
| * Note: Although there are op codes available for PMGR_OPEN and |
| * PMGR_ABORT, neither is fully implemented and should not be used. |
| */ |
| static int mvapich_processops (mvapich_state_t *st) |
| { |
| /* Until a 'CLOSE' or 'ABORT' message is seen, we continuously |
| * loop processing ops |
| */ |
| mvapich_debug ("Initiated PMGR processing"); |
| while (mvapich_pmgr_loop (st) != 1) {}; |
| mvapich_debug ("Completed processing PMGR opcodes"); |
| |
| return (0); |
| } |
| |
| static void mvapich_bcast (mvapich_state_t *st) |
| { |
| if (!mvapich_dual_phase (st) || st->protocol_phase > 0) |
| return mvapich_bcast_addrs (st); |
| else |
| return mvapich_bcast_hostids (st); |
| } |
| |
| static void mvapich_barrier (mvapich_state_t *st) |
| { |
| int i; |
| struct mvapich_info *m; |
| struct mvapich_poll *mp; |
| /* |
| * Simple barrier to wait for qp's to come up. |
| * Once all processes have written their rank over the socket, |
| * simply write their rank right back to them. |
| */ |
| |
| debug ("mvapich: starting barrier"); |
| mp = mvapich_poll_create (st); |
| while ((m = mvapich_poll_next (mp, 1))) |
| mvapich_read (m, &i, sizeof (i)); |
| |
| debug ("mvapich: completed barrier for all tasks"); |
| |
| mvapich_poll_reset (mp); |
| while ((m = mvapich_poll_next (mp, 0))) |
| mvapich_write (m, &m->rank, sizeof (m->rank)); |
| |
| return; |
| } |
| |
| static void |
| mvapich_print_abort_message (mvapich_state_t *st, int rank, |
| int dest, char *msg, int msglen) |
| { |
| slurm_step_layout_t *sl = st->job->step_layout; |
| char *host; |
| char *msgstr; |
| char time_stamp[256]; |
| |
| if (!mvapich_abort_sends_rank (st)) { |
| info ("mvapich: Received ABORT message from an MPI process."); |
| return; |
| } |
| |
| if (msg && (msglen > 0)) { |
| /* |
| * Remove trailing newline if it exists (syslog will |
| * add newline) |
| */ |
| if (msg [msglen - 1] == '\n') |
| msg [msglen - 1] = '\0'; |
| |
| msgstr = msg; |
| } |
| else { |
| msgstr = ""; |
| msglen = 0; |
| } |
| |
| host = slurm_step_layout_host_name (sl, rank); |
| |
| LOG_TIMESTAMP(time_stamp); |
| if (dest >= 0) { |
| const char *dsthost = slurm_step_layout_host_name (sl, dest); |
| info ("mvapich: %s: ABORT from MPI rank %d " |
| "[on %s] dest rank %d [on %s]", |
| time_stamp, rank, host, dest, dsthost); |
| |
| /* |
| * Log the abort event to syslog |
| * so that system administrators know about possible HW events. |
| */ |
| openlog ("srun", 0, LOG_USER); |
| syslog (LOG_WARNING, |
| "MVAPICH ABORT [jobid=%u.%u src=%d(%s) " |
| "dst=%d(%s)]: %s", |
| st->job->jobid, st->job->stepid, |
| rank, host, dest, dsthost, msgstr); |
| closelog(); |
| } else { |
| info ("mvapich: %s: ABORT from MPI rank %d [on %s]", |
| time_stamp, rank, host); |
| /* |
| * Log the abort event to syslog |
| * so that system administrators know about possible HW events. |
| */ |
| openlog ("srun", 0, LOG_USER); |
| syslog (LOG_WARNING, |
| "MVAPICH ABORT [jobid=%u.%u src=%d(%s) dst=-1()]: %s", |
| st->job->jobid, st->job->stepid, |
| rank, host, msgstr); |
| closelog(); |
| |
| } |
| return; |
| } |
| |
| |
| static int mvapich_abort_timeout (void) |
| { |
| int timeout; |
| |
| if (first_abort_time == 0) |
| return (-1); |
| |
| timeout = 60 - (time (NULL) - first_abort_time); |
| |
| if (timeout < 0) |
| return (0); |
| |
| return (timeout * 1000); |
| } |
| |
| |
| /* |
| * Returns file descriptor from which to read abort message, |
| * -1 on error, or exits if shutdown message is received |
| */ |
| |
| static int mvapich_abort_accept (mvapich_state_t *st) |
| { |
| slurm_addr_t addr; |
| int rc; |
| struct pollfd pfds[2]; |
| |
| /* |
| * st->fd accepts connections from MPI procs to indicate an MPI error |
| * st->shutdown_pipe is written to by the main thread, to break out |
| * of the poll call when it is time to shut down |
| */ |
| |
| pfds[0].fd = st->fd; |
| pfds[0].events = POLLIN; |
| |
| pfds[1].fd = st->shutdown_pipe[0]; |
| pfds[1].events = POLLIN; |
| |
| mvapich_debug3 ("Polling to accept MPI_ABORT timeout=%d", |
| mvapich_abort_timeout ()); |
| |
| /* |
| * limit cancellation to the long periods waiting on this poll |
| */ |
| while ((rc = poll (pfds, 2, mvapich_abort_timeout ())) < 0) { |
| if (errno == EINTR || errno == EAGAIN) |
| continue; |
| |
| return (-1); |
| } |
| |
| /* |
| * If poll() timed out, forcibly kill job and exit instead of |
| * waiting longer for remote IO, process exit, etc. |
| */ |
| if (rc == 0) { |
| mvapich_terminate_job (st, "Timeout waiting for all tasks after ABORT."); |
| /* NORETURN */ |
| } |
| |
| if (pfds[1].revents & POLLIN) { |
| mvapich_thr_exit(st); |
| } |
| return (slurm_accept_msg_conn (st->fd, &addr)); |
| } |
| |
| |
| static void mvapich_wait_for_abort(mvapich_state_t *st) |
| { |
| int src, dst; |
| int ranks[2]; |
| int n; |
| char msg [1024] = ""; |
| int msglen = 0; |
| |
| /* |
| * Wait for abort notification from any process. |
| * For mvapich 0.9.4, it appears that an MPI_Abort is registered |
| * simply by connecting to this socket and immediately closing |
| * the connection. In other versions, the process may write |
| * its rank. |
| */ |
| while (1) { |
| int newfd = mvapich_abort_accept (st); |
| |
| if (newfd == -1) { |
| fatal("MPI master failed to accept (abort-wait)"); |
| } |
| |
| fd_set_blocking (newfd); |
| |
| ranks[1] = -1; |
| if ((n = fd_read_n (newfd, &ranks, sizeof (ranks))) < 0) { |
| error("mvapich: MPI recv (abort-wait) failed"); |
| close (newfd); |
| continue; |
| } |
| |
| /* |
| * If we read both src/dest rank, then also try to |
| * read an error message. If this fails, msglen will |
| * stay zero and no message will be printed. |
| */ |
| if (n == sizeof (ranks)) { |
| dst = ranks[0]; |
| src = ranks[1]; |
| fd_read_n (newfd, &msglen, sizeof (int)); |
| if (msglen) |
| fd_read_n (newfd, msg, msglen); |
| } else { |
| src = ranks[0]; |
| dst = -1; |
| } |
| |
| close(newfd); |
| |
| mvapich_print_abort_message (st, src, dst, msg, msglen); |
| slurm_signal_job_step (st->job->jobid, st->job->stepid, SIGKILL); |
| if (!first_abort_time) |
| first_abort_time = time (NULL); |
| } |
| |
| return; /* but not reached */ |
| } |
| |
| |
| static void mvapich_mvarray_destroy (mvapich_state_t *st) |
| { |
| int i; |
| |
| if (st->mvarray) { |
| for (i = 0; i < st->nprocs; i++) { |
| if (st->mvarray[i]) |
| mvapich_info_destroy(st->mvarray[i]); |
| } |
| xfree(st->mvarray); |
| } |
| } |
| |
| static void do_timings (mvapich_state_t *st, const char *fmt, ...) |
| { |
| static int initialized = 0; |
| static struct timeval initv = { 0, 0 }; |
| struct timeval tv; |
| struct timeval result; |
| char *msg; |
| va_list ap; |
| |
| if (!st->do_timing) |
| return; |
| |
| |
| if (!initialized) { |
| if (gettimeofday (&initv, NULL) < 0) |
| error ("mvapich: do_timings(): gettimeofday(): %m"); |
| initialized = 1; |
| return; |
| } |
| |
| if (gettimeofday (&tv, NULL) < 0) { |
| error ("mvapich: do_timings(): gettimeofday(): %m"); |
| return; |
| } |
| |
| timersub (&tv, &initv, &result); |
| |
| va_start (ap, fmt); |
| msg = vmsg (fmt, ap); |
| va_end (ap); |
| |
| info ("mvapich: %s took %ld.%03ld seconds", msg, |
| (long int)result.tv_sec, (long int)result.tv_usec/1000); |
| |
| xfree (msg); |
| |
| return; |
| } |
| |
| static int mvapich_read_item (struct mvapich_info *mvi, void *buf, size_t size) |
| { |
| size_t nleft; |
| ssize_t n; |
| unsigned char *p; |
| |
| p = buf + mvi->nread; |
| nleft = size - mvi->nread; |
| |
| if ((n = read (mvi->fd, p, nleft)) < 0) { |
| if (errno == EAGAIN) |
| return (EAGAIN); |
| else { |
| error ("mvapich: %d: nread=%d, read (%d, %zx, " |
| "size=%zd, nleft=%zd): %m", |
| mvi->rank, mvi->nread, |
| mvi->fd, (size_t) buf, size, nleft); |
| return (-1); |
| } |
| } |
| |
| mvi->nread += n; |
| if (mvi->nread == size) { |
| mvi->nread = 0; |
| mvi->state++; |
| } |
| |
| return (0); |
| } |
| |
| /* |
| * Process initial mvapich states to read items such as |
| * version, rank, hostidlen, hostids... and so on. |
| * |
| * Current state is tracked int he mvapich_info object itself |
| * and state transitions happen automatically in mvapich_read_item() |
| * when the current item is completely read. Early exit from |
| * the state processing may occur based on protocol version. |
| * Similarly, some protocol version may enter state processing |
| * at a different point. |
| * |
| * State processing is considered complete when state == MV_INIT_DONE. |
| * |
| */ |
| static int mvapich_info_process_init (mvapich_state_t *st, |
| struct mvapich_info *mvi) |
| { |
| int rc = 0; |
| |
| again: |
| switch (mvi->state) { |
| case MV_READ_VERSION: |
| mvapich_debug2 ("fd %d: reading mvapich version.", mvi->fd); |
| rc = mvapich_read_item (mvi, &mvi->version, sizeof (mvi->version)); |
| |
| if (mvi->state != MV_READ_RANK) |
| break; |
| |
| case MV_READ_RANK: |
| if (st->protocol_version < 0) |
| st->protocol_version = mvi->version; |
| |
| mvapich_debug2 ("fd %d: reading mvapich rank. version = %d", |
| mvi->fd, mvi->version); |
| |
| rc = mvapich_read_item (mvi, &mvi->rank, sizeof (int)); |
| |
| /* |
| * No hostids in protocol version 3. |
| */ |
| if (mvi->version == 3 && mvi->state == MV_READ_HOSTIDLEN) { |
| mvi->state = MV_READ_ADDRLEN; |
| goto again; |
| } |
| |
| if (mvi->version >= 8 || mvi->state != MV_READ_HOSTIDLEN) |
| break; |
| |
| case MV_READ_HOSTIDLEN: |
| mvapich_debug2 ("rank %d: reading hostidlen.", mvi->rank); |
| |
| mvi->hostidlen = 0; |
| rc = mvapich_read_item (mvi, &mvi->hostidlen, sizeof (mvi->hostidlen)); |
| |
| if (mvi->state != MV_READ_HOSTID) |
| break; |
| |
| case MV_READ_HOSTID: |
| if (mvi->hostidlen != sizeof (int)) { |
| error ("mvapich: rank %d: unexpected hostidlen = %d", |
| mvi->rank, mvi->hostidlen); |
| return (-1); |
| } |
| mvapich_debug2 ("rank %d: reading hostid. hostidlen = %d", |
| mvi->rank, mvi->hostidlen); |
| |
| rc = mvapich_read_item (mvi, &mvi->hostid, mvi->hostidlen); |
| |
| if (mvi->state != MV_READ_ADDRLEN || mvi->version > 3) |
| break; |
| |
| case MV_READ_ADDRLEN: |
| mvapich_debug2 ("rank %d: read addrlen.", mvi->rank); |
| |
| rc = mvapich_read_item (mvi, &mvi->addrlen, sizeof (mvi->addrlen)); |
| |
| if (mvi->state != MV_READ_ADDRS) |
| break; |
| |
| case MV_READ_ADDRS: |
| mvapich_debug2 ("rank %d: read addr. addrlen = %d", |
| mvi->rank, mvi->addrlen); |
| |
| mvi->addr = xmalloc (mvi->addrlen); |
| rc = mvapich_read_item (mvi, mvi->addr, mvi->addrlen); |
| |
| if (mvi->state != MV_READ_PIDLEN || !mvapich_requires_pids (st)) |
| break; |
| |
| case MV_READ_PIDLEN: |
| mvapich_debug2 ("rank %d: read pidlen", mvi->rank); |
| |
| rc = mvapich_read_item (mvi, &mvi->pidlen, sizeof (int)); |
| |
| if (mvi->state != MV_READ_PID) |
| break; |
| |
| case MV_READ_PID: |
| mvapich_debug2 ("rank %d: read pid: pidlen = %d", |
| mvi->rank, mvi->pidlen); |
| |
| mvi->pid = xmalloc (mvi->pidlen); |
| |
| rc = mvapich_read_item (mvi, mvi->pid, mvi->pidlen); |
| |
| break; |
| |
| case MV_INIT_DONE: |
| break; |
| } |
| |
| /* |
| * If protocol doesn't read PIDs, we're done after ADDRs |
| */ |
| if (mvi->state == MV_READ_PIDLEN && !mvapich_requires_pids (st)) |
| mvi->state = MV_INIT_DONE; |
| |
| /* |
| * Protocol version 4,5,6: Done after reading HOSTID |
| */ |
| if (mvi->state == MV_READ_ADDRLEN && mvi->version >= 5) |
| mvi->state = MV_INIT_DONE; |
| |
| /* |
| * Protocol version 8: Done after reading RANK |
| */ |
| if (mvi->state == MV_READ_HOSTIDLEN && mvi->version == 8) |
| mvi->state = MV_INIT_DONE; |
| |
| return (rc); |
| } |
| |
| |
| /* |
| * Accept as many new connections as possible and place them on |
| * the next available slot in the mvarray. |
| */ |
| static int mvapich_accept_new (mvapich_state_t *st) |
| { |
| slurm_addr_t addr; |
| int fd; |
| |
| /* |
| * Accept as many new connections as possible |
| */ |
| while (1) { |
| if ( ((fd = slurm_accept_msg_conn (st->fd, &addr)) < 0) |
| && errno == EAGAIN) { |
| mvapich_debug2 ("mvapich: accept new: %m"); |
| return (0); |
| } |
| else if (fd < 0) { |
| error ("mvapich: accept: %m"); |
| return (-1); |
| } |
| |
| if (st->nconnected == 0 && st->protocol_phase == 0) { |
| mvapich_debug ("first task connected"); |
| do_timings (st, NULL); |
| /* |
| * Officially start timeout timer now. |
| */ |
| st->start_time = time (NULL); |
| } |
| |
| fd_set_nonblocking (fd); |
| |
| st->mvarray[st->nconnected] = mvapich_info_create (); |
| st->mvarray[st->nconnected]->fd = fd; |
| st->nconnected++; |
| |
| mvapich_debug3 ("Got connection %d: fd=%d", st->nconnected, fd); |
| } |
| |
| return (0); |
| } |
| |
| /* |
| * Accept new connections on st->fd and process them with the |
| * function [fn]. The poll loop preferentially handles incoming |
| * connections to avoid exceeding the socket listen queue, which can |
| * be quite likely when launching very large jobs. |
| * |
| * When there are no connections waiting, and existing connections register |
| * read activity, these connections are processed using [fn], until |
| * such time as the mvapich_info state == MV_INIT_DONE. |
| * |
| * Returns 0 after all successful connections made |
| * -1 on an error |
| * Exits if st->shutdown_pipe is written to |
| */ |
| static int |
| mvapich_initialize_connections (mvapich_state_t *st, |
| int (fn) (mvapich_state_t *, struct mvapich_info *) ) |
| { |
| int i, j; |
| int nfds; |
| int ncompleted; |
| int rc = 0; |
| int printonce = 0; |
| struct mvapich_info **mvmap; |
| struct pollfd *fds; |
| |
| fds = xmalloc ((st->nprocs+2) * sizeof (struct pollfd)); |
| mvmap = xmalloc (st->nprocs * sizeof (struct mvapich_info *)); |
| st->nconnected = 0; |
| |
| while (1) { |
| |
| memset (fds, 0, sizeof (struct pollfd) * (st->nprocs + 2)); |
| memset (mvmap, 0, sizeof (struct mvapich_info *) * st->nprocs); |
| |
| /* |
| * Listen socket |
| */ |
| fds[0].fd = st->fd; |
| fds[0].events = POLLIN; |
| |
| /* |
| * Shutdown pipe |
| */ |
| fds[1].fd = st->shutdown_pipe[0]; |
| fds[1].events = POLLIN; |
| |
| j = 2; |
| nfds = 2; |
| ncompleted = 0; |
| |
| if (st->nconnected < st->nprocs) |
| mvapich_debug2 ("Waiting for connection %d/%d", |
| st->nconnected + 1, st->nprocs); |
| |
| for (i = 0; i < st->nconnected; i++) { |
| struct mvapich_info *m = st->mvarray[i]; |
| |
| if (m->fd >= 0 && m->state < MV_INIT_DONE) { |
| mvmap[j-2] = m; |
| fds[j].fd = m->fd; |
| fds[j].events = POLLIN; |
| j++; |
| nfds++; |
| } |
| else if (m->fd > 0 && m->state == MV_INIT_DONE) |
| ncompleted++; |
| } |
| |
| if (st->nconnected == st->nprocs && !printonce) { |
| mvapich_debug ("Got %d connections.", st->nprocs); |
| do_timings (st, "Accept %d connection%s%s", |
| st->nprocs, st->nprocs == 1 ? "" : "s", |
| st->protocol_phase ? " (phase 2)" : ""); |
| printonce = 1; |
| } |
| |
| if (ncompleted == st->nprocs) { |
| do_timings (st, "Read info for %d task%s%s", |
| st->nprocs, st->nprocs == 1 ? "" : "s", |
| st->protocol_phase ? " (phase 2)" : ""); |
| break; /* All done. */ |
| } |
| |
| mvapich_debug3 ("do_poll (nfds=%d)", nfds); |
| |
| while ((rc = poll (fds, nfds, startup_timeout (st))) < 0) { |
| if (errno == EINTR || errno == EAGAIN) |
| continue; |
| error ("mvapich: poll: %m"); |
| break; |
| } |
| if (rc == 0) { |
| report_absent_tasks (st, 1); |
| mvapich_terminate_job (st, NULL); |
| } |
| |
| mvapich_debug3 ("poll (nfds=%d) = %d", nfds, rc); |
| |
| /* |
| * Stop other work if told to shut down |
| */ |
| if (fds[1].revents == POLLIN) { |
| xfree (fds); |
| xfree (mvmap); |
| mvapich_thr_exit(st); |
| } |
| |
| /* |
| * Preferentially accept new connections. |
| */ |
| if (fds[0].revents == POLLIN) { |
| if ((rc = mvapich_accept_new (st)) < 0) |
| break; |
| continue; |
| } |
| |
| /* |
| * If there are no pending connections, handle read |
| * activity with passed in function [fn]. |
| */ |
| for (i = 0; i < st->nconnected; i++) { |
| if (fds[i+2].revents == POLLIN) { |
| if ((rc = (*fn) (st, mvmap[i])) < 0) |
| goto out; |
| } |
| } |
| } |
| |
| out: |
| xfree (fds); |
| xfree (mvmap); |
| return (rc); |
| } |
| |
| |
| static int mvapich_phase_two (mvapich_state_t *st) |
| { |
| struct mvapich_poll *mp; |
| struct mvapich_info *mvi; |
| int i; |
| |
| /* |
| * For phase 2, start reading addrlen for all tasks: |
| */ |
| for (i = 0; i < st->nprocs; i++) |
| st->mvarray[i]->state = MV_READ_ADDRLEN; |
| |
| mvapich_debug ("Reading addrs from all tasks"); |
| mp = mvapich_poll_create (st); |
| while ((mvi = mvapich_poll_next (mp, 1))) { |
| |
| mvapich_info_process_init (st, mvi); |
| |
| if (mvi->state == MV_INIT_DONE) |
| mvi->do_poll = 0; |
| } |
| mvapich_poll_destroy (mp); |
| |
| do_timings (st, "Reading addrs from %d tasks", st->nprocs); |
| |
| mvapich_bcast_addrs (st); |
| |
| do_timings (st, "Bcast addrs to %d tasks", st->nprocs); |
| |
| return (0); |
| } |
| |
| static int read_phase2_header (mvapich_state_t *st, struct mvapich_info *mvi) |
| { |
| int rc; |
| |
| /* |
| * Phase 2 header is just our rank, so we know who the |
| * new connection is coming from. |
| */ |
| if ((rc = mvapich_read (mvi, &mvi->rank, sizeof (mvi->rank))) < 0) |
| error ("mvapich_read: %m"); |
| /* |
| * mvapich_read resets do_poll if we're done reading. |
| * Use this to set our state to MV_INIT_DONE so we don't continue |
| * to poll on this fd. |
| */ |
| if (mvi->do_poll == 0) |
| mvi->state = MV_INIT_DONE; |
| |
| return (rc); |
| } |
| |
| static int mvapich_handle_phase_two (mvapich_state_t *st) |
| { |
| mvapich_debug ("protocol phase 0 complete. beginning phase 2."); |
| |
| st->protocol_phase = 1; |
| |
| /* |
| * Phase 2 is either in "connect_once" mode, where we reuse |
| * the existing connection (easy), or we have to handle the |
| * remote tasks reconnecting and re-sending their ranks |
| * before restarting the protocol. Since we don't know which |
| * connection is from which rank, we have to use a temporary |
| * mvapich_info array until all ranks have been read. |
| */ |
| if (!st->connect_once) { |
| struct mvapich_info **mvarray = st->mvarray; |
| int i; |
| |
| mvapich_debug ("Waiting for %d ranks to reconnect", st->nprocs); |
| |
| /* |
| * Create temporary mvarray to handle incoming connections |
| */ |
| st->mvarray = xmalloc (st->nprocs * sizeof (struct mvapich_info *)); |
| |
| /* |
| * Accept all incoming connections and read the header (rank). |
| */ |
| if (mvapich_initialize_connections (st, read_phase2_header) < 0) |
| mvapich_terminate_job (st, "Failed to initialize phase 2"); |
| |
| do_timings (st, "Phase 2 reconnect"); |
| |
| /* |
| * Now reassign mvi->fds in the real mvarray, and copy |
| * this back to st->mvarray. |
| */ |
| for (i = 0; i < st->nprocs; i++) { |
| struct mvapich_info *mvi = st->mvarray[i]; |
| mvarray[mvi->rank]->fd = mvi->fd; |
| } |
| |
| xfree (st->mvarray); |
| st->mvarray = mvarray; |
| } |
| |
| /* |
| * Finish processing phase two. |
| */ |
| mvapich_phase_two (st); |
| |
| return (0); |
| } |
| |
| /* |
| * Intialize all NPROCS connections |
| */ |
| static void mvapich_connection_init (mvapich_state_t *st) |
| { |
| struct mvapich_info **mva; |
| int i; |
| |
| st->mvarray = xmalloc (st->nprocs * sizeof (*(st->mvarray))); |
| |
| /* |
| * Get initial connections and read task header information: |
| */ |
| if (mvapich_initialize_connections (st, mvapich_info_process_init) < 0) |
| goto fail; |
| |
| /* |
| * Sort mvarray in rank order. The rest of the startup code |
| * expects this. |
| */ |
| mva = xmalloc (st->nprocs * sizeof (*mva)); |
| for (i = 0; i < st->nprocs; i++) { |
| if ((mva[i] = mvapich_info_find (st, i)) == NULL) { |
| error ("mvapich: failed to find rank %d!", i); |
| goto fail; |
| } |
| } |
| xfree (st->mvarray); |
| st->mvarray = mva; |
| |
| return; |
| |
| fail: |
| mvapich_terminate_job (st, "Fatal error. Killing job"); |
| return; |
| } |
| |
| /* |
| * Close all fds in mvarray |
| */ |
| static void mvapich_close_fds (mvapich_state_t *st) |
| { |
| int i; |
| for (i = 0; i < st->nprocs; i++) { |
| struct mvapich_info *mvi = st->mvarray[i]; |
| close (mvi->fd); |
| mvi->fd = -1; |
| } |
| } |
| |
| /* |
| * This separate mvapich thread handles the MVAPICH startup |
| * protocol (tries to handle the many versions of it...). |
| */ |
| static void *mvapich_thr(void *arg) |
| { |
| mvapich_state_t *st = arg; |
| |
| /* |
| * Accept and initialize all remote task connections: |
| */ |
| mvapich_connection_init (st); |
| |
| /* |
| * Process subsequent phases of various protocol versions. |
| */ |
| if (st->protocol_version == 8) { |
| if (mvapich_processops (st) < 0) |
| mvapich_terminate_job (st, "mvapich_processops failed."); |
| } |
| else { |
| mvapich_debug ("bcasting mvapich info to %d tasks", st->nprocs); |
| mvapich_bcast (st); |
| do_timings (st,"Bcasting mvapich info to %d tasks", st->nprocs); |
| |
| if (mvapich_dual_phase (st) && st->protocol_phase == 0) { |
| if (mvapich_handle_phase_two (st) < 0) |
| mvapich_terminate_job (st, "Phase 2 failed."); |
| } |
| |
| do_timings (st, "Phase 2"); |
| |
| mvapich_debug ("calling mvapich_barrier"); |
| mvapich_barrier (st); |
| mvapich_debug ("all tasks have checked in"); |
| mvapich_close_fds (st); |
| } |
| |
| do_timings (st, "MVAPICH initialization"); |
| mvapich_wait_for_abort (st); |
| return (NULL); |
| } |
| |
| |
| static int process_environment (mvapich_state_t *st) |
| { |
| char *val; |
| |
| if (getenv ("MVAPICH_CONNECT_TWICE")) |
| st->connect_once = 0; |
| |
| if ((val = getenv ("SLURM_MVAPICH_DEBUG"))) { |
| int level = atoi (val); |
| if (level > 0) |
| mvapich_verbose = level; |
| } |
| |
| if (getenv ("SLURM_MVAPICH_TIMING")) |
| st->do_timing = 1; |
| |
| if ((val = getenv ("SLURM_MVAPICH_TIMEOUT"))) { |
| st->timeout = atoi (val); |
| } |
| |
| return (0); |
| } |
| |
| static mvapich_state_t * |
| mvapich_state_create(const mpi_plugin_client_info_t *job) |
| { |
| mvapich_state_t *state; |
| |
| state = (mvapich_state_t *)xmalloc(sizeof(mvapich_state_t)); |
| |
| state->tid = (pthread_t)-1; |
| state->mvarray = NULL; |
| state->fd = -1; |
| state->nprocs = job->step_layout->task_cnt; |
| state->protocol_version = -1; |
| state->protocol_phase = 0; |
| state->connect_once = 1; |
| state->do_timing = 0; |
| state->timeout = 600; |
| state->shutdown_timeout = 5; |
| |
| if (pipe(state->shutdown_pipe) < 0) { |
| error ("mvapich_state_create: pipe: %m"); |
| xfree(state); |
| return (NULL); |
| } |
| fd_set_nonblocking(state->shutdown_pipe[0]); |
| fd_set_nonblocking(state->shutdown_pipe[1]); |
| state->shutdown_complete = false; |
| |
| slurm_mutex_init(&state->shutdown_lock); |
| pthread_cond_init(&state->shutdown_cond, NULL); |
| |
| *(state->job) = *job; |
| |
| return state; |
| } |
| |
| static void mvapich_state_destroy(mvapich_state_t *st) |
| { |
| mvapich_mvarray_destroy(st); |
| |
| close(st->shutdown_pipe[0]); |
| close(st->shutdown_pipe[1]); |
| |
| slurm_mutex_destroy(&st->shutdown_lock); |
| pthread_cond_destroy(&st->shutdown_cond); |
| |
| xfree(st); |
| } |
| |
| /* |
| * Create a unique MPIRUN_ID for jobid/stepid pairs. |
| * Combine the least significant bits of the jobid and stepid |
| * |
| * The MPIRUN_ID is used by MVAPICH to create shmem files in /tmp, |
| * so we have to make sure multiple jobs and job steps on the |
| * same node have different MPIRUN_IDs. |
| */ |
| int mpirun_id_create(const mpi_plugin_client_info_t *job) |
| { |
| return (int) ((job->jobid << 16) | (job->stepid & 0xffff)); |
| } |
| |
| /* |
| * Returns the port number in host byte order. |
| */ |
| static short _sock_bind_wild(int sockfd) |
| { |
| socklen_t len; |
| struct sockaddr_in sin; |
| |
| memset(&sin, 0, sizeof(sin)); |
| sin.sin_family = AF_INET; |
| sin.sin_addr.s_addr = htonl(INADDR_ANY); |
| sin.sin_port = htons(0); /* bind ephemeral port */ |
| |
| if (bind(sockfd, (struct sockaddr *) &sin, sizeof(sin)) < 0) |
| return (-1); |
| len = sizeof(sin); |
| if (getsockname(sockfd, (struct sockaddr *) &sin, &len) < 0) |
| return (-1); |
| return ntohs(sin.sin_port); |
| } |
| |
| |
| int do_listen (int *fd, short *port) |
| { |
| int rc, val; |
| |
| if ((*fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) |
| return -1; |
| |
| val = 1; |
| rc = setsockopt(*fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(int)); |
| if (rc > 0) |
| goto cleanup; |
| |
| *port = _sock_bind_wild(*fd); |
| rc = listen(*fd, 2048); |
| |
| if (rc < 0) |
| goto cleanup; |
| |
| return 1; |
| |
| cleanup: |
| close(*fd); |
| return -1; |
| |
| } |
| |
| extern mvapich_state_t *mvapich_thr_create(const mpi_plugin_client_info_t *job, |
| char ***env) |
| { |
| short port; |
| pthread_attr_t attr; |
| mvapich_state_t *st = NULL; |
| |
| st = mvapich_state_create(job); |
| if (!st) { |
| error ("mvapich: Failed initialization"); |
| return NULL; |
| } |
| if (process_environment (st) < 0) { |
| error ("mvapich: Failed to read environment settings"); |
| mvapich_state_destroy(st); |
| return NULL; |
| } |
| if (do_listen (&st->fd, &port) < 0) { |
| error ("Unable to create ib listen port: %m"); |
| mvapich_state_destroy(st); |
| return NULL; |
| } |
| |
| fd_set_nonblocking (st->fd); |
| |
| /* |
| * Accept in a separate thread. |
| */ |
| slurm_attr_init(&attr); |
| pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); |
| if (pthread_create(&st->tid, &attr, &mvapich_thr, (void *)st)) { |
| slurm_attr_destroy(&attr); |
| mvapich_state_destroy(st); |
| return NULL; |
| } |
| slurm_attr_destroy(&attr); |
| |
| /* |
| * Set some environment variables in current env so they'll get |
| * passed to all remote tasks |
| */ |
| env_array_overwrite_fmt(env, "MPIRUN_PORT", "%hu", port); |
| env_array_overwrite_fmt(env, "MPIRUN_NPROCS", "%d", st->nprocs); |
| env_array_overwrite_fmt(env, "MPIRUN_ID", "%d", mpirun_id_create(job)); |
| if (st->connect_once) { |
| env_array_overwrite_fmt(env, "MPIRUN_CONNECT_ONCE", "1"); |
| } |
| |
| verbose ("mvapich-0.9.x,1.0.x master listening on port %hu", port); |
| |
| return st; |
| } |
| |
| /* |
| * The main thread calls this function to terminate the mpi thread and clean |
| * up. A write to this pipe will break the mpi thread out of one of two poll |
| * calls--the wait for mpi abort messages and the wait for initial connections. |
| * The mpi thread will spend most of its time in the first location if this |
| * is an mpi job, and the second location if this is not an mpi job. When the |
| * mpi thread sees activity on this pipe, it will set st->shutdown_complete = |
| * true and then pthread_exit(). If the mpi thread is not blocked on either of |
| * those polls, and does not reach either poll within st->shutdown_timeout |
| * secs, the main thread returns. The main thread could call pthread_cancel |
| * if it can't shutdown nicely, but there's a danger the thread could be |
| * cancelled while it has a mutex locked, especially while logging. |
| */ |
| extern int mvapich_thr_destroy(mvapich_state_t *st) |
| { |
| if (st != NULL) { |
| if (st->tid != (pthread_t)-1) { |
| char tmp = 1; |
| int n; |
| |
| n = write(st->shutdown_pipe[1], &tmp, 1); |
| if (n == 1) { |
| struct timespec ts = {0, 0}; |
| |
| slurm_mutex_lock(&st->shutdown_lock); |
| ts.tv_sec = time(NULL) + st->shutdown_timeout; |
| |
| while (!st->shutdown_complete) { |
| if (time(NULL) >= ts.tv_sec) { |
| break; |
| } |
| pthread_cond_timedwait( |
| &st->shutdown_cond, |
| &st->shutdown_lock, &ts); |
| } |
| slurm_mutex_unlock(&st->shutdown_lock); |
| } |
| } |
| if (st->shutdown_complete) { |
| mvapich_state_destroy(st); |
| } |
| } |
| return SLURM_SUCCESS; |
| } |
| |
| |
| void mvapich_thr_exit(mvapich_state_t *st) |
| { |
| pthread_mutex_lock(&st->shutdown_lock); |
| |
| st->shutdown_complete = true; |
| |
| pthread_cond_signal(&st->shutdown_cond); |
| pthread_mutex_unlock(&st->shutdown_lock); |
| |
| pthread_exit(NULL); |
| } |
| |
| |
| |