| /*****************************************************************************\ |
| ** pmix_utils.c - Various PMIx utility functions |
| ***************************************************************************** |
| * Copyright (C) 2014-2015 Artem Polyakov. All rights reserved. |
| * Copyright (C) 2015-2017 Mellanox Technologies. All rights reserved. |
| * Written by Artem Polyakov <artpol84@gmail.com, artemp@mellanox.com>. |
| * |
| * This file is part of Slurm, a resource management program. |
| * For details, see <https://slurm.schedmd.com/>. |
| * Please also read the included file: DISCLAIMER. |
| * |
| * Slurm is free software; you can redistribute it and/or modify it under |
| * the terms of the GNU General Public License as published by the Free |
| * Software Foundation; either version 2 of the License, or (at your option) |
| * any later version. |
| * |
| * In addition, as a special exception, the copyright holders give permission |
| * to link the code of portions of this program with the OpenSSL library under |
| * certain conditions as described in each individual source file, and |
| * distribute linked combinations including the two. You must obey the GNU |
| * General Public License in all respects for all of the code used other than |
| * OpenSSL. If you modify file(s) with this exception, you may extend this |
| * exception to your version of the file(s), but you are not obligated to do |
| * so. If you do not wish to do so, delete this exception statement from your |
| * version. If you delete this exception statement from all source files in |
| * the program, then also delete it here. |
| * |
| * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY |
| * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
| * details. |
| * |
| * You should have received a copy of the GNU General Public License along |
| * with Slurm; if not, write to the Free Software Foundation, Inc., |
| * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| \*****************************************************************************/ |
| |
| #include "config.h" |
| |
| #include <dirent.h> |
| #include <errno.h> |
| #include <limits.h> |
| #include <poll.h> |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <string.h> |
| #include <sys/socket.h> |
| #include <sys/stat.h> |
| #include <sys/types.h> |
| #include <sys/uio.h> |
| #include <sys/un.h> |
| #include <time.h> |
| #include <unistd.h> |
| #include <dirent.h> |
| #include <netinet/in.h> |
| #include <netinet/tcp.h> |
| |
| #include "pmixp_common.h" |
| #include "pmixp_utils.h" |
| #include "pmixp_debug.h" |
| |
| /* must come after the above pmixp includes */ |
| #include "src/common/forward.h" |
| |
| extern int pmixp_count_digits_base10(uint32_t val) |
| { |
| int digit_count = 0; |
| |
| while (val) { |
| digit_count++; |
| val /= 10; |
| } |
| |
| return digit_count; |
| } |
| |
| void pmixp_free_buf(void *x) |
| { |
| buf_t *buf = (buf_t *) x; |
| FREE_NULL_BUFFER(buf); |
| } |
| |
| int pmixp_usock_create_srv(char *path) |
| { |
| static struct sockaddr_un sa; |
| int ret = 0; |
| |
| if (strlen(path) >= sizeof(sa.sun_path)) { |
| PMIXP_ERROR_STD("UNIX socket path is too long: %lu, max %lu", |
| (unsigned long) strlen(path), |
| (unsigned long) sizeof(sa.sun_path) - 1); |
| return SLURM_ERROR; |
| } |
| |
| int fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0); |
| if (fd < 0) { |
| PMIXP_ERROR_STD("Cannot create UNIX socket"); |
| return SLURM_ERROR; |
| } |
| |
| memset(&sa, 0, sizeof(sa)); |
| sa.sun_family = AF_UNIX; |
| strcpy(sa.sun_path, path); |
| if ((ret = bind(fd, (struct sockaddr *)&sa, SUN_LEN(&sa)))) { |
| PMIXP_ERROR_STD("Cannot bind() UNIX socket %s", path); |
| goto err_fd; |
| } |
| |
| if ((ret = listen(fd, 64))) { |
| PMIXP_ERROR_STD("Cannot listen(%d, 64) UNIX socket %s", fd, |
| path); |
| goto err_bind; |
| |
| } |
| return fd; |
| |
| err_bind: |
| unlink(path); |
| err_fd: |
| close(fd); |
| return ret; |
| } |
| |
| size_t pmixp_read_buf(int sd, void *buf, size_t count, int *shutdown, |
| bool blocking) |
| { |
| ssize_t ret, offs = 0; |
| |
| *shutdown = 0; |
| |
| if (blocking) { |
| fd_set_blocking(sd); |
| } |
| |
| while (count - offs > 0) { |
| ret = read(sd, (char *)buf + offs, count - offs); |
| if (ret > 0) { |
| offs += ret; |
| continue; |
| } else if (ret == 0) { |
| /* connection closed. */ |
| *shutdown = 1; |
| return offs; |
| } |
| switch (errno) { |
| case EINTR: |
| continue; |
| case EWOULDBLOCK: |
| /* we can get here in non-blocking mode only */ |
| return offs; |
| default: |
| PMIXP_ERROR_STD("blocking=%d", blocking); |
| *shutdown = -errno; |
| return offs; |
| } |
| } |
| |
| if (blocking) { |
| fd_set_nonblocking(sd); |
| } |
| return offs; |
| } |
| |
| int pmixp_fd_set_nodelay(int fd) |
| { |
| int val = 1; |
| if ( 0 > setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&val, |
| sizeof(val)) ) { |
| PMIXP_ERROR_STD("Cannot set TCP_NODELAY on fd = %d\n", fd); |
| return SLURM_ERROR; |
| } |
| return SLURM_SUCCESS; |
| } |
| |
| size_t pmixp_write_buf(int sd, void *buf, size_t count, int *shutdown, |
| bool blocking) |
| { |
| ssize_t ret, offs = 0; |
| |
| *shutdown = 0; |
| |
| if (!blocking && !pmixp_fd_write_ready(sd, shutdown)) { |
| return 0; |
| } |
| |
| if (blocking) { |
| fd_set_blocking(sd); |
| } |
| |
| while (count - offs > 0) { |
| ret = write(sd, (char *)buf + offs, count - offs); |
| if (ret > 0) { |
| offs += ret; |
| continue; |
| } |
| switch (errno) { |
| case EINTR: |
| continue; |
| case EWOULDBLOCK: |
| return offs; |
| default: |
| *shutdown = -errno; |
| return offs; |
| } |
| } |
| |
| if (blocking) { |
| fd_set_nonblocking(sd); |
| } |
| |
| return offs; |
| } |
| |
| static int _iov_shift(struct iovec *iov, size_t iovcnt, int offset) |
| { |
| int skip, i; |
| size_t count = 0; |
| |
| /* find out how many iov's was completely sent */ |
| for (skip = 0; skip < iovcnt; skip++) { |
| if (offset < count + iov[skip].iov_len) { |
| break; |
| } |
| count += iov[skip].iov_len; |
| } |
| |
| /* remove tose iov's from the list */ |
| for (i = 0; i < iovcnt - skip; i++) { |
| iov[i] = iov[i + skip]; |
| } |
| |
| /* shift the current iov */ |
| offset -= count; |
| iov[0].iov_base += offset; |
| iov[0].iov_len -= offset; |
| return iovcnt - skip; |
| } |
| |
| size_t pmixp_writev_buf(int sd, struct iovec *iov, size_t iovcnt, |
| size_t offset, int *shutdown) |
| { |
| ssize_t ret; |
| size_t size = 0, written = 0; |
| int i; |
| |
| for (i=0; i < iovcnt; i++) { |
| size += iov[i].iov_len; |
| } |
| |
| /* Adjust initial buffer with the offset */ |
| iovcnt = _iov_shift(iov, iovcnt, offset); |
| |
| *shutdown = 0; |
| |
| while (size - (offset + written) > 0) { |
| ret = writev(sd, iov, iovcnt); |
| if (ret > 0) { |
| written += ret; |
| iovcnt = _iov_shift(iov, iovcnt, ret); |
| continue; |
| } |
| switch (errno) { |
| case EINTR: |
| continue; |
| case EWOULDBLOCK: |
| return written; |
| default: |
| *shutdown = -errno; |
| return written; |
| } |
| } |
| |
| return written; |
| } |
| |
| bool pmixp_fd_read_ready(int fd, int *shutdown) |
| { |
| struct pollfd pfd[1]; |
| int rc; |
| pfd[0].fd = fd; |
| pfd[0].events = POLLIN; |
| /* Drop shutdown before the check */ |
| *shutdown = 0; |
| |
| rc = poll(pfd, 1, 0); |
| if (rc < 0) { |
| if (!(errno == EINTR)) { |
| *shutdown = -errno; |
| return false; |
| } |
| } |
| |
| bool ret = ((rc == 1) && (pfd[0].revents & POLLIN)); |
| if (!ret && (pfd[0].revents & (POLLERR | POLLHUP | POLLNVAL))) { |
| if (pfd[0].revents & (POLLERR | POLLNVAL)) { |
| *shutdown = -EBADF; |
| } else { |
| /* POLLHUP - normal connection close */ |
| *shutdown = 1; |
| } |
| } |
| return ret; |
| } |
| |
| bool pmixp_fd_write_ready(int fd, int *shutdown) |
| { |
| struct pollfd pfd[1]; |
| int rc = 0; |
| struct timeval tv; |
| double start, cur; |
| pfd[0].fd = fd; |
| pfd[0].events = POLLOUT; |
| pfd[0].revents = 0; |
| |
| gettimeofday(&tv,NULL); |
| start = tv.tv_sec + 1E-6*tv.tv_usec; |
| cur = start; |
| while ((cur - start) < 0.01) { |
| rc = poll(pfd, 1, 10); |
| |
| /* update current timestamp */ |
| gettimeofday(&tv,NULL); |
| cur = tv.tv_sec + 1E-6*tv.tv_usec; |
| if (0 > rc) { |
| if (errno == EINTR) { |
| continue; |
| } else { |
| *shutdown = -errno; |
| return false; |
| } |
| } |
| break; |
| } |
| |
| if (pfd[0].revents & (POLLERR | POLLHUP | POLLNVAL)) { |
| if (pfd[0].revents & (POLLERR | POLLNVAL)) { |
| *shutdown = -EBADF; |
| } else { |
| /* POLLHUP - normal connection close */ |
| *shutdown = 1; |
| } |
| } |
| return ((rc == 1) && (pfd[0].revents & POLLOUT)); |
| } |
| |
| int pmixp_stepd_send(const char *nodelist, const char *address, |
| const char *data, uint32_t len, |
| unsigned int start_delay, |
| unsigned int retry_cnt, int silent) |
| { |
| |
| int retry = 0, rc = SLURM_SUCCESS; |
| unsigned int delay = start_delay; /* in milliseconds */ |
| char *copy_of_nodelist = xstrdup(nodelist); |
| |
| while (1) { |
| if (!silent && retry >= 1) { |
| PMIXP_DEBUG("send failed, rc=%d, try #%d", rc, retry); |
| } |
| |
| rc = slurm_forward_data(©_of_nodelist, (char *)address, |
| len, data); |
| |
| if (rc == SLURM_SUCCESS) |
| break; |
| |
| retry++; |
| if (retry >= retry_cnt) { |
| PMIXP_ERROR("send failed, rc=%d, exceeded the retry limit", rc); |
| break; |
| } |
| |
| /* wait with constantly increasing delay */ |
| struct timespec ts = |
| {(delay / MSEC_IN_SEC), |
| ((delay % MSEC_IN_SEC) * NSEC_IN_MSEC)}; |
| nanosleep(&ts, NULL); |
| delay *= 2; |
| } |
| xfree(copy_of_nodelist); |
| |
| return rc; |
| } |
| |
| static int _pmix_p2p_send_core(const char *nodename, const char *address, |
| const char *data, uint32_t len) |
| { |
| int rc, timeout; |
| slurm_msg_t msg; |
| forward_data_msg_t req; |
| list_t *ret_list; |
| ret_data_info_t *ret_data_info = NULL; |
| |
| pmixp_debug_hang(0); |
| |
| slurm_msg_t_init(&msg); |
| |
| PMIXP_DEBUG("nodelist=%s, address=%s, len=%u", nodename, address, len); |
| req.address = (char *)address; |
| req.len = len; |
| /* there is not much we can do - just cast) */ |
| req.data = (char*)data; |
| |
| msg.msg_type = REQUEST_FORWARD_DATA; |
| msg.data = &req; |
| |
| if (slurm_conf_get_addr(nodename, &msg.address, msg.flags) |
| == SLURM_ERROR) { |
| PMIXP_ERROR("Can't find address for host " |
| "%s, check slurm.conf", nodename); |
| return SLURM_ERROR; |
| } |
| |
| timeout = slurm_conf.msg_timeout * 1000; |
| msg.forward.timeout = timeout; |
| msg.forward.cnt = 0; |
| msg.forward.nodelist = NULL; |
| slurm_msg_set_r_uid(&msg, slurm_conf.slurmd_user_id); |
| ret_list = slurm_send_addr_recv_msgs(&msg, (char*)nodename, timeout); |
| if (!ret_list) { |
| /* This should never happen (when this was |
| * written slurm_send_addr_recv_msgs always |
| * returned a list */ |
| PMIXP_ERROR("No return list given from " |
| "slurm_send_addr_recv_msgs spawned for %s", |
| nodename); |
| return SLURM_ERROR; |
| } else if ((errno != SLURM_COMMUNICATIONS_CONNECTION_ERROR) && |
| !list_count(ret_list)) { |
| PMIXP_ERROR("failed to send to %s, errno=%d", nodename, errno); |
| return SLURM_ERROR; |
| } |
| |
| rc = SLURM_SUCCESS; |
| while ((ret_data_info = list_pop(ret_list))) { |
| int temp_rc = slurm_get_return_code(ret_data_info->type, |
| ret_data_info->data); |
| if (temp_rc != SLURM_SUCCESS) |
| rc = temp_rc; |
| destroy_data_info(ret_data_info); |
| } |
| |
| FREE_NULL_LIST(ret_list); |
| |
| return rc; |
| } |
| |
| int pmixp_p2p_send(const char *nodename, const char *address, const char *data, |
| uint32_t len, unsigned int start_delay, |
| unsigned int retry_cnt, int silent) |
| { |
| int retry = 0, rc = SLURM_SUCCESS; |
| unsigned int delay = start_delay; /* in milliseconds */ |
| |
| pmixp_debug_hang(0); |
| |
| while (1) { |
| if (!silent && retry >= 1) { |
| PMIXP_DEBUG("send failed, rc=%d, try #%d", rc, retry); |
| } |
| |
| rc = _pmix_p2p_send_core(nodename, address, data, len); |
| |
| if (rc == SLURM_SUCCESS) |
| break; |
| |
| retry++; |
| if (retry >= retry_cnt) { |
| PMIXP_ERROR("send failed, rc=%d, exceeded the retry limit", rc); |
| break; |
| } |
| |
| /* wait with constantly increasing delay */ |
| struct timespec ts = |
| {(delay / MSEC_IN_SEC), |
| ((delay % MSEC_IN_SEC) * NSEC_IN_MSEC)}; |
| nanosleep(&ts, NULL); |
| delay *= 2; |
| } |
| |
| return rc; |
| } |
| |
| int pmixp_mkdir(char *path, bool trusted) |
| { |
| char *base = NULL, *newdir = NULL, *slash; |
| int dirfd, flags; |
| mode_t rights = (S_IRUSR | S_IWUSR | S_IXUSR); |
| |
| /* NOTE: we need user who owns the job to access PMIx usock |
| * file. According to 'man 7 unix': |
| * "... In the Linux implementation, sockets which are visible in the |
| * file system honor the permissions of the directory they are in... " |
| * Our case is the following: slurmstepd is usually running as root, |
| * user application will be "sudo'ed". To provide both of them with |
| * access to the unix socket we do the following: |
| * 1. Owner ID is set to the job owner. |
| * 2. Group ID corresponds to slurmstepd. |
| * 3. Set 0700 access mode |
| */ |
| |
| base = xstrdup(path); |
| /* split into base and new directory name */ |
| while ((slash = strrchr(base, '/'))) { |
| /* fix a path with one or more trailing slashes */ |
| if (slash[1] == '\0') |
| slash[0] = '\0'; |
| else |
| break; |
| } |
| |
| if (!slash) { |
| PMIXP_ERROR_STD("Invalid directory \"%s\"", path); |
| xfree(base); |
| return EINVAL; |
| } |
| |
| slash[0] = '\0'; |
| newdir = slash + 1; |
| flags = O_DIRECTORY; |
| if (!trusted) |
| flags |= O_NOFOLLOW; |
| |
| if ((dirfd = open(base, flags)) < 0) { |
| PMIXP_ERROR_STD("Could not open parent directory \"%s\"", base); |
| xfree(base); |
| return errno; |
| } |
| |
| #ifdef MULTIPLE_SLURMD |
| struct stat statbuf; |
| flags = 0; |
| if (!trusted) |
| flags |= AT_SYMLINK_NOFOLLOW; |
| if (!fstatat(dirfd, newdir, &statbuf, flags)) { |
| if ((statbuf.st_mode & S_IFDIR) && |
| (statbuf.st_uid == pmixp_info_jobuid())) { |
| PMIXP_ERROR_STD("Directory \"%s\" already exists, but has correct uid", |
| path); |
| close(dirfd); |
| xfree(base); |
| return 0; |
| } |
| } |
| #endif |
| |
| if (mkdirat(dirfd, newdir, rights) < 0) { |
| PMIXP_ERROR_STD("Cannot create directory \"%s\"", |
| path); |
| close(dirfd); |
| xfree(base); |
| return errno; |
| } |
| |
| if (fchownat(dirfd, newdir, (uid_t) pmixp_info_jobuid(), (gid_t) -1, |
| AT_SYMLINK_NOFOLLOW) < 0) { |
| error("%s: fchownath(%s): %m", __func__, path); |
| close(dirfd); |
| xfree(base); |
| return errno; |
| } |
| |
| close(dirfd); |
| xfree(base); |
| return 0; |
| } |