| /*****************************************************************************\ |
| * src/slurmd/slurmstepd/io.c - Standard I/O handling routines for slurmstepd |
| ***************************************************************************** |
| * Copyright (C) 2002 The Regents of the University of California. |
| * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). |
| * Written by Mark Grondona <mgrondona@llnl.gov>. |
| * CODE-OCEC-09-009. All rights reserved. |
| * |
| * This file is part of Slurm, a resource management program. |
| * For details, see <https://slurm.schedmd.com/>. |
| * Please also read the included file: DISCLAIMER. |
| * |
| * Slurm is free software; you can redistribute it and/or modify it under |
| * the terms of the GNU General Public License as published by the Free |
| * Software Foundation; either version 2 of the License, or (at your option) |
| * any later version. |
| * |
| * In addition, as a special exception, the copyright holders give permission |
| * to link the code of portions of this program with the OpenSSL library under |
| * certain conditions as described in each individual source file, and |
| * distribute linked combinations including the two. You must obey the GNU |
| * General Public License in all respects for all of the code used other than |
| * OpenSSL. If you modify file(s) with this exception, you may extend this |
| * exception to your version of the file(s), but you are not obligated to do |
| * so. If you do not wish to do so, delete this exception statement from your |
| * version. If you delete this exception statement from all source files in |
| * the program, then also delete it here. |
| * |
| * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY |
| * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
| * details. |
| * |
| * You should have received a copy of the GNU General Public License along |
| * with Slurm; if not, write to the Free Software Foundation, Inc., |
| * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| \*****************************************************************************/ |
| |
| #include "config.h" |
| |
| #define _GNU_SOURCE /* for setresuid(3) */ |
| |
| #ifdef HAVE_PTY_H |
| # include <pty.h> |
| #endif |
| |
| #ifdef HAVE_UTMP_H |
| # include <utmp.h> |
| #endif |
| |
| #include <errno.h> |
| #include <fcntl.h> |
| #include <poll.h> |
| #include <stdlib.h> |
| #include <string.h> |
| #include <sys/socket.h> |
| #include <sys/stat.h> |
| #include <sys/types.h> |
| #include <termios.h> |
| #include <unistd.h> |
| |
| #include "src/common/cbuf.h" |
| #include "src/common/eio.h" |
| #include "src/common/fd.h" |
| #include "src/common/io_hdr.h" |
| #include "src/common/list.h" |
| #include "src/common/log.h" |
| #include "src/common/macros.h" |
| #include "src/common/net.h" |
| #include "src/common/read_config.h" |
| #include "src/common/write_labelled_message.h" |
| #include "src/common/xmalloc.h" |
| #include "src/common/xstring.h" |
| |
| #include "src/interfaces/conn.h" |
| |
| #include "src/slurmd/common/fname.h" |
| #include "src/slurmd/slurmd/slurmd.h" |
| #include "src/slurmd/slurmstepd/io.h" |
| #include "src/slurmd/slurmstepd/slurmstepd.h" |
| |
| /* |
| * The message cache uses up free message buffers, so STDIO_MAX_MSG_CACHE |
| * must be a number smaller than STDIO_MAX_FREE_BUF. |
| */ |
| #define STDIO_MAX_FREE_BUF 1024 |
| #define STDIO_MAX_MSG_CACHE 128 |
| |
| #define STDIO_FILE_RETRIES 10 |
| |
| struct io_buf { |
| int ref_count; |
| uint32_t length; |
| void *data; |
| }; |
| |
| static struct io_buf *_alloc_io_buf(void); |
| static void _free_io_buf(struct io_buf *buf); |
| |
| /********************************************************************** |
| * IO client socket declarations |
| **********************************************************************/ |
| static bool _client_readable(eio_obj_t *); |
| static bool _client_writable(eio_obj_t *); |
| static int _client_read(eio_obj_t *, list_t *); |
| static int _client_write(eio_obj_t *, list_t *); |
| |
| struct io_operations client_ops = { |
| .readable = &_client_readable, |
| .writable = &_client_writable, |
| .handle_read = &_client_read, |
| .handle_write = &_client_write, |
| }; |
| |
| #define CLIENT_IO_MAGIC 0x10102 |
| struct client_io_info { |
| int magic; |
| stepd_step_rec_t *step; /* pointer back to step data */ |
| |
| /* incoming variables */ |
| io_hdr_t header; |
| struct io_buf *in_msg; |
| int32_t in_remaining; |
| bool in_eof; |
| |
| /* outgoing variables */ |
| list_t *msg_queue; |
| struct io_buf *out_msg; |
| int32_t out_remaining; |
| bool out_eof; |
| |
| /* For clients that only write stdout or stderr, and/or only |
| write for one task. -1 means accept output from any task. */ |
| int ltaskid_stdout, ltaskid_stderr; |
| bool labelio; |
| int taskid_width; |
| |
| /* true if writing to a file, false if writing to a socket */ |
| bool is_local_file; |
| }; |
| |
| |
| static bool _local_file_writable(eio_obj_t *); |
| static int _local_file_write(eio_obj_t *, list_t *); |
| |
| struct io_operations local_file_ops = { |
| .writable = &_local_file_writable, |
| .handle_write = &_local_file_write, |
| }; |
| |
| |
| /********************************************************************** |
| * Task write declarations |
| **********************************************************************/ |
| static bool _task_writable(eio_obj_t *); |
| static int _task_write(eio_obj_t *, list_t *); |
| static int _task_write_error(eio_obj_t *obj, list_t *objs); |
| |
| struct io_operations task_write_ops = { |
| .writable = &_task_writable, |
| .handle_write = &_task_write, |
| .handle_error = &_task_write_error, |
| }; |
| |
| #define TASK_IN_MAGIC 0x10103 |
| struct task_write_info { |
| int magic; |
| stepd_step_rec_t *step; /* pointer back to step data */ |
| |
| list_t *msg_queue; |
| struct io_buf *msg; |
| int32_t remaining; |
| }; |
| |
| /********************************************************************** |
| * Task read declarations |
| **********************************************************************/ |
| static bool _task_readable(eio_obj_t *); |
| static int _task_read(eio_obj_t *, list_t *); |
| |
| struct io_operations task_read_ops = { |
| .readable = &_task_readable, |
| .handle_read = &_task_read, |
| }; |
| |
| #define TASK_OUT_MAGIC 0x10103 |
| struct task_read_info { |
| int magic; |
| uint16_t type; /* type of IO object */ |
| uint16_t gtaskid; |
| uint16_t ltaskid; |
| stepd_step_rec_t *step; /* pointer back to step data */ |
| cbuf_t *buf; |
| bool eof; |
| bool eof_msg_sent; |
| }; |
| |
| /********************************************************************** |
| * Pseudo terminal declarations |
| **********************************************************************/ |
| struct window_info { |
| stepd_step_task_info_t *task; |
| stepd_step_rec_t *step; |
| void *conn; |
| }; |
| #ifdef HAVE_PTY_H |
| static void _spawn_window_manager(stepd_step_task_info_t *task, stepd_step_rec_t *step); |
| static void *_window_manager(void *arg); |
| #endif |
| |
| /********************************************************************** |
| * General declarations |
| **********************************************************************/ |
| static void *_io_thr(void *); |
| static int _send_io_init_msg(int sock, void *conn, srun_info_t *srun, |
| stepd_step_rec_t *step, bool init); |
| static void _send_eof_msg(struct task_read_info *out); |
| static struct io_buf *_task_build_message(struct task_read_info *out, |
| stepd_step_rec_t *step, cbuf_t *cbuf); |
| static void *_io_thr(void *arg); |
| static void _route_msg_task_to_client(eio_obj_t *obj); |
| static void _free_outgoing_msg(struct io_buf *msg, stepd_step_rec_t *step); |
| static void _free_incoming_msg(struct io_buf *msg, stepd_step_rec_t *step); |
| static void _free_all_outgoing_msgs(list_t *msg_queue, stepd_step_rec_t *step); |
| static bool _incoming_buf_free(stepd_step_rec_t *step); |
| static bool _outgoing_buf_free(stepd_step_rec_t *step); |
| static int _send_connection_okay_response(stepd_step_rec_t *step); |
| static struct io_buf *_build_connection_okay_message(stepd_step_rec_t *step); |
| |
| /********************************************************************** |
| * IO client socket functions |
| **********************************************************************/ |
| static bool |
| _client_readable(eio_obj_t *obj) |
| { |
| struct client_io_info *client = (struct client_io_info *) obj->arg; |
| |
| debug5("Called _client_readable"); |
| xassert(client->magic == CLIENT_IO_MAGIC); |
| |
| if (client->in_eof) { |
| debug5(" false, in_eof"); |
| /* We no longer want the _client_read() function to handle |
| errors on write now that the read side of the connection |
| is closed. Setting handle_read to NULL will result in |
| the _client_write function handling errors, and closing |
| down the write end of the connection. */ |
| obj->ops->handle_read = NULL; |
| return false; |
| } |
| |
| if (obj->shutdown) { |
| debug5(" false, shutdown"); |
| shutdown(obj->fd, SHUT_RD); |
| client->in_eof = true; |
| return false; |
| } |
| |
| if (client->in_msg != NULL |
| || _incoming_buf_free(client->step)) |
| return true; |
| |
| debug5(" false"); |
| return false; |
| } |
| |
| static bool |
| _client_writable(eio_obj_t *obj) |
| { |
| struct client_io_info *client = (struct client_io_info *) obj->arg; |
| |
| debug5("Called _client_writable"); |
| xassert(client->magic == CLIENT_IO_MAGIC); |
| |
| if (client->out_eof == true) { |
| debug5(" false, out_eof"); |
| return false; |
| } |
| |
| /* If this is a newly attached client its msg_queue needs |
| * to be initialized from the outgoing_cache, and then "obj" needs |
| * to be added to the list of clients. |
| */ |
| if (client->msg_queue == NULL) { |
| list_itr_t *msgs; |
| struct io_buf *msg; |
| client->msg_queue = list_create(NULL); /* need destructor */ |
| msgs = list_iterator_create(client->step->outgoing_cache); |
| while ((msg = list_next(msgs))) { |
| msg->ref_count++; |
| list_enqueue(client->msg_queue, msg); |
| } |
| list_iterator_destroy(msgs); |
| /* and now make this object visible to tasks */ |
| list_append(client->step->clients, (void *)obj); |
| } |
| |
| if (client->out_msg != NULL) |
| debug5(" client->out.msg != NULL"); |
| if (!list_is_empty(client->msg_queue)) |
| debug5(" client->out.msg_queue queue length = %d", |
| list_count(client->msg_queue)); |
| |
| if (client->out_msg != NULL |
| || !list_is_empty(client->msg_queue)) |
| return true; |
| |
| debug5(" false"); |
| return false; |
| } |
| |
| static int _client_read(eio_obj_t *obj, list_t *objs) |
| { |
| struct client_io_info *client = (struct client_io_info *) obj->arg; |
| void *buf; |
| int n; |
| |
| debug4("Entering _client_read"); |
| xassert(client->magic == CLIENT_IO_MAGIC); |
| |
| /* |
| * Read the header, if a message read is not already in progress |
| */ |
| if (client->in_msg == NULL) { |
| if (_incoming_buf_free(client->step)) { |
| client->in_msg = |
| list_dequeue(client->step->free_incoming); |
| } else { |
| debug5(" _client_read free_incoming is empty"); |
| return SLURM_SUCCESS; |
| } |
| n = io_hdr_read_fd(obj->fd, obj->conn, &client->header); |
| if (n <= 0) { /* got eof or fatal error */ |
| debug5(" got eof or error _client_read header, n=%d", n); |
| client->in_eof = true; |
| list_enqueue(client->step->free_incoming, |
| client->in_msg); |
| client->in_msg = NULL; |
| return SLURM_SUCCESS; |
| } |
| debug5("client->header.length = %u", client->header.length); |
| if (client->header.length > SLURM_IO_MAX_MSG_LEN) |
| error("Message length of %u exceeds maximum of %u", |
| client->header.length, SLURM_IO_MAX_MSG_LEN); |
| client->in_remaining = client->header.length; |
| client->in_msg->length = client->header.length; |
| } |
| |
| /* |
| * Read the body |
| */ |
| if (client->header.type == SLURM_IO_CONNECTION_TEST) { |
| if (client->header.length != 0) { |
| debug5(" error in _client_read: bad connection test"); |
| list_enqueue(client->step->free_incoming, |
| client->in_msg); |
| client->in_msg = NULL; |
| return SLURM_ERROR; |
| } |
| if (_send_connection_okay_response(client->step)) { |
| /* |
| * If we get here because of a failed |
| * _send_connection_okay_response, it's because of a |
| * lack of buffer space in the output queue. Just |
| * keep the current input message client->in_msg in |
| * place, and resend on the next call. |
| */ |
| return SLURM_SUCCESS; |
| } |
| list_enqueue(client->step->free_incoming, client->in_msg); |
| client->in_msg = NULL; |
| return SLURM_SUCCESS; |
| } else if (client->header.length == 0) { /* zero length is an eof message */ |
| debug5(" got stdin eof message!"); |
| } else { |
| buf = client->in_msg->data + |
| (client->in_msg->length - client->in_remaining); |
| again: |
| if (obj->conn) { |
| n = conn_g_recv(obj->conn, buf, client->in_remaining); |
| } else { |
| n = read(obj->fd, buf, client->in_remaining); |
| } |
| if (n < 0) { |
| if (errno == EINTR) |
| goto again; |
| if (errno == EAGAIN || errno == EWOULDBLOCK) { |
| debug5("_client_read returned %s", |
| errno == EAGAIN ? "EAGAIN" : "EWOULDBLOCK"); |
| return SLURM_SUCCESS; |
| } |
| debug5(" error in _client_read: %m"); |
| } |
| if (n <= 0) { /* got eof (or unhandled error) */ |
| debug5(" got eof on _client_read body"); |
| client->in_eof = true; |
| list_enqueue(client->step->free_incoming, |
| client->in_msg); |
| client->in_msg = NULL; |
| return SLURM_SUCCESS; |
| } |
| client->in_remaining -= n; |
| if (client->in_remaining > 0) |
| return SLURM_SUCCESS; |
| /* *(char *)(buf + n) = '\0'; */ |
| /* debug5("\"%s\"", buf); */ |
| } |
| |
| /* |
| * Route the message to its destination(s) |
| */ |
| if (client->header.type != SLURM_IO_STDIN |
| && client->header.type != SLURM_IO_ALLSTDIN) { |
| error("Input client->header.type is not valid!"); |
| client->in_msg = NULL; |
| return SLURM_ERROR; |
| } else { |
| int i; |
| stepd_step_task_info_t *task; |
| struct task_write_info *io; |
| |
| client->in_msg->ref_count = 0; |
| if (client->header.type == SLURM_IO_ALLSTDIN) { |
| for (i = 0; i < client->step->node_tasks; i++) { |
| task = client->step->task[i]; |
| io = (struct task_write_info *)task->in->arg; |
| client->in_msg->ref_count++; |
| list_enqueue(io->msg_queue, client->in_msg); |
| } |
| debug5(" message ref_count = %d", client->in_msg->ref_count); |
| } else { |
| for (i = 0; i < client->step->node_tasks; i++) { |
| task = client->step->task[i]; |
| if (task->in == NULL) |
| continue; |
| io = (struct task_write_info *)task->in->arg; |
| if (task->gtid != client->header.gtaskid) |
| continue; |
| client->in_msg->ref_count++; |
| list_enqueue(io->msg_queue, client->in_msg); |
| break; |
| } |
| } |
| } |
| client->in_msg = NULL; |
| debug4("Leaving _client_read"); |
| return SLURM_SUCCESS; |
| } |
| |
| /* |
| * Write outgoing packed messages to the client socket. |
| */ |
| static int _client_write(eio_obj_t *obj, list_t *objs) |
| { |
| struct client_io_info *client = (struct client_io_info *) obj->arg; |
| void *buf; |
| int n; |
| |
| xassert(client->magic == CLIENT_IO_MAGIC); |
| |
| debug4("Entering _client_write"); |
| |
| /* |
| * If we aren't already in the middle of sending a message, get the |
| * next message from the queue. |
| */ |
| if (client->out_msg == NULL) { |
| client->out_msg = list_dequeue(client->msg_queue); |
| if (client->out_msg == NULL) { |
| debug5("_client_write: nothing in the queue"); |
| return SLURM_SUCCESS; |
| } |
| debug5(" dequeue successful, client->out_msg->length = %d", |
| client->out_msg->length); |
| client->out_remaining = client->out_msg->length; |
| } |
| |
| debug5(" client->out_remaining = %d", client->out_remaining); |
| |
| /* |
| * Write message to socket. |
| */ |
| buf = client->out_msg->data + |
| (client->out_msg->length - client->out_remaining); |
| again: |
| if (obj->conn) { |
| n = conn_g_send(obj->conn, buf, client->out_remaining); |
| } else { |
| n = write(obj->fd, buf, client->out_remaining); |
| } |
| if (n < 0) { |
| if (errno == EINTR) { |
| goto again; |
| } else if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { |
| debug5("_client_write returned EAGAIN"); |
| return SLURM_SUCCESS; |
| } else { |
| client->out_eof = true; |
| _free_all_outgoing_msgs(client->msg_queue, |
| client->step); |
| return SLURM_SUCCESS; |
| } |
| } |
| if (n < client->out_remaining) { |
| error("Only wrote %d of %d bytes to socket", |
| n, client->out_remaining); |
| } else |
| debug5("Wrote %d bytes to socket", n); |
| client->out_remaining -= n; |
| if (client->out_remaining > 0) |
| return SLURM_SUCCESS; |
| |
| _free_outgoing_msg(client->out_msg, client->step); |
| client->out_msg = NULL; |
| |
| return SLURM_SUCCESS; |
| } |
| |
| |
| static bool |
| _local_file_writable(eio_obj_t *obj) |
| { |
| struct client_io_info *client = (struct client_io_info *) obj->arg; |
| |
| xassert(client->magic == CLIENT_IO_MAGIC); |
| |
| if (client->out_eof == true) |
| return false; |
| |
| if (client->out_msg != NULL || !list_is_empty(client->msg_queue)) |
| return true; |
| |
| return false; |
| } |
| |
| |
| /* |
| * The slurmstepd writes I/O to a file, possibly adding a label. |
| */ |
| static int _local_file_write(eio_obj_t *obj, list_t *objs) |
| { |
| struct client_io_info *client = (struct client_io_info *) obj->arg; |
| void *buf; |
| int n; |
| io_hdr_t header; |
| buf_t *header_tmp_buf; |
| |
| xassert(client->magic == CLIENT_IO_MAGIC); |
| /* |
| * If we aren't already in the middle of sending a message, get the |
| * next message from the queue. |
| */ |
| if (client->out_msg == NULL) { |
| client->out_msg = list_dequeue(client->msg_queue); |
| if (client->out_msg == NULL) { |
| return SLURM_SUCCESS; |
| } |
| client->out_remaining = |
| (client->out_msg->length - IO_HDR_PACKET_BYTES); |
| } |
| |
| /* |
| * This code to make a buffer, fill it, unpack its contents, and free |
| * it is just used to read the header to get the global task id. |
| */ |
| header_tmp_buf = create_buf(client->out_msg->data, |
| client->out_msg->length); |
| if (!header_tmp_buf) { |
| fatal("Failure to allocate memory for a message header"); |
| return SLURM_ERROR; /* Fix CLANG false positive error */ |
| } |
| io_hdr_unpack(&header, header_tmp_buf); |
| header_tmp_buf->head = NULL; /* CLANG false positive bug here */ |
| FREE_NULL_BUFFER(header_tmp_buf); |
| |
| /* |
| * A zero-length message indicates the end of a stream from one |
| * of the tasks. Just free the message and return. |
| */ |
| if (header.length == 0) { |
| _free_outgoing_msg(client->out_msg, client->step); |
| client->out_msg = NULL; |
| return SLURM_SUCCESS; |
| } |
| |
| /* Write the message to the file. */ |
| buf = client->out_msg->data + |
| (client->out_msg->length - client->out_remaining); |
| n = write_labelled_message(obj->fd, buf, client->out_remaining, |
| header.gtaskid, client->step->het_job_offset, |
| client->step->het_job_task_offset, |
| client->labelio, client->taskid_width); |
| if (n < 0) { |
| client->out_eof = true; |
| _free_all_outgoing_msgs(client->msg_queue, client->step); |
| return SLURM_ERROR; |
| } |
| |
| client->out_remaining -= n; |
| if (client->out_remaining == 0) { |
| _free_outgoing_msg(client->out_msg, client->step); |
| client->out_msg = NULL; |
| } |
| return SLURM_SUCCESS; |
| } |
| |
| |
| |
| |
| /********************************************************************** |
| * Task write functions |
| **********************************************************************/ |
| /* |
| * Create an eio_obj_t for handling a task's stdin traffic |
| */ |
| static eio_obj_t * |
| _create_task_in_eio(int fd, stepd_step_rec_t *step) |
| { |
| struct task_write_info *t = xmalloc(sizeof(*t)); |
| eio_obj_t *eio = NULL; |
| |
| t->magic = TASK_IN_MAGIC; |
| t->step = step; |
| t->msg_queue = list_create(NULL); /* FIXME! Add destructor */ |
| t->msg = NULL; |
| t->remaining = 0; |
| |
| eio = eio_obj_create(fd, &task_write_ops, (void *)t); |
| |
| return eio; |
| } |
| |
| static bool |
| _task_writable(eio_obj_t *obj) |
| { |
| struct task_write_info *t = (struct task_write_info *) obj->arg; |
| |
| debug5("Called _task_writable"); |
| |
| if (obj->fd == -1) { |
| debug5(" false, fd == -1"); |
| return false; |
| } |
| |
| if (t->msg != NULL || list_count(t->msg_queue) > 0) { |
| debug5(" true, list_count = %d", list_count(t->msg_queue)); |
| return true; |
| } |
| |
| debug5(" false (list_count = %d)", list_count(t->msg_queue)); |
| return false; |
| } |
| |
| static int _task_write_error(eio_obj_t *obj, list_t *objs) |
| { |
| debug4("Called _task_write_error, closing fd %d", obj->fd); |
| |
| close(obj->fd); |
| obj->fd = -1; |
| |
| return SLURM_SUCCESS; |
| } |
| |
| static int _task_write(eio_obj_t *obj, list_t *objs) |
| { |
| struct task_write_info *in = (struct task_write_info *) obj->arg; |
| void *buf; |
| int n; |
| |
| debug4("Entering _task_write"); |
| xassert(in->magic == TASK_IN_MAGIC); |
| |
| /* |
| * If we aren't already in the middle of sending a message, get the |
| * next message from the queue. |
| */ |
| if (in->msg == NULL) { |
| in->msg = list_dequeue(in->msg_queue); |
| if (in->msg == NULL) { |
| debug5("_task_write: nothing in the queue"); |
| return SLURM_SUCCESS; |
| } |
| if (in->msg->length == 0) { /* eof message */ |
| close(obj->fd); |
| obj->fd = -1; |
| _free_incoming_msg(in->msg, in->step); |
| in->msg = NULL; |
| return SLURM_SUCCESS; |
| } |
| in->remaining = in->msg->length; |
| } |
| |
| /* |
| * Write message to pipe. |
| */ |
| buf = in->msg->data + (in->msg->length - in->remaining); |
| again: |
| if ((n = write(obj->fd, buf, in->remaining)) < 0) { |
| if (errno == EINTR) |
| goto again; |
| else if (errno == EAGAIN || errno == EWOULDBLOCK) |
| return SLURM_SUCCESS; |
| else { |
| close(obj->fd); |
| obj->fd = -1; |
| _free_incoming_msg(in->msg, in->step); |
| in->msg = NULL; |
| return SLURM_ERROR; |
| } |
| } |
| in->remaining -= n; |
| if (in->remaining > 0) |
| return SLURM_SUCCESS; |
| |
| _free_incoming_msg(in->msg, in->step); |
| in->msg = NULL; |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /********************************************************************** |
| * Task read functions |
| **********************************************************************/ |
| /* |
| * Create an eio_obj_t for handling a task's stdout or stderr traffic |
| */ |
| static eio_obj_t * |
| _create_task_out_eio(int fd, uint16_t type, |
| stepd_step_rec_t *step, stepd_step_task_info_t *task) |
| { |
| struct task_read_info *out = xmalloc(sizeof(*out)); |
| eio_obj_t *eio = NULL; |
| |
| out->magic = TASK_OUT_MAGIC; |
| out->type = type; |
| out->gtaskid = task->gtid; |
| out->ltaskid = task->id; |
| out->step = step; |
| out->buf = cbuf_create(SLURM_IO_MAX_MSG_LEN, |
| (SLURM_IO_MAX_MSG_LEN * 4)); |
| out->eof = false; |
| out->eof_msg_sent = false; |
| if (cbuf_opt_set(out->buf, CBUF_OPT_OVERWRITE, CBUF_NO_DROP) == -1) |
| error("setting cbuf options"); |
| |
| eio = eio_obj_create(fd, &task_read_ops, (void *)out); |
| |
| return eio; |
| } |
| |
| static bool |
| _task_readable(eio_obj_t *obj) |
| { |
| struct task_read_info *out = (struct task_read_info *)obj->arg; |
| |
| debug5("Called _task_readable, task %d, %s", out->gtaskid, |
| out->type == SLURM_IO_STDOUT ? "STDOUT" : "STDERR"); |
| |
| if (out->eof_msg_sent) { |
| debug5(" false, eof message sent"); |
| return false; |
| } |
| if (cbuf_free(out->buf) > 0) { |
| debug5(" cbuf_free = %d", cbuf_free(out->buf)); |
| return true; |
| } |
| |
| debug5(" false"); |
| return false; |
| } |
| |
| /* |
| * Read output (stdout or stderr) from a task into a cbuf. The cbuf |
| * allows whole lines to be packed into messages if line buffering |
| * is requested. |
| */ |
| static int _task_read(eio_obj_t *obj, list_t *objs) |
| { |
| struct task_read_info *out = (struct task_read_info *)obj->arg; |
| int len; |
| int rc = -1; |
| |
| xassert(out->magic == TASK_OUT_MAGIC); |
| |
| debug4("Entering _task_read for obj %zx", (size_t)obj); |
| len = cbuf_free(out->buf); |
| if (len > 0 && !out->eof) { |
| again: |
| if ((rc = cbuf_write_from_fd(out->buf, obj->fd, len, NULL)) |
| < 0) { |
| if (errno == EINTR) |
| goto again; |
| if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { |
| debug5("_task_read returned EAGAIN"); |
| return SLURM_SUCCESS; |
| } |
| debug5(" error in _task_read: %m"); |
| } |
| if (rc <= 0) { /* got eof */ |
| debug5(" got eof on task"); |
| out->eof = true; |
| } |
| } |
| |
| debug5("************************ %d bytes read from task %s", rc, |
| out->type == SLURM_IO_STDOUT ? "STDOUT" : "STDERR"); |
| |
| /* |
| * Put the message in client outgoing queues |
| */ |
| _route_msg_task_to_client(obj); |
| |
| /* |
| * Send the eof message |
| */ |
| if (cbuf_used(out->buf) == 0 && out->eof && !out->eof_msg_sent) { |
| _send_eof_msg(out); |
| } |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /********************************************************************** |
| * Pseudo terminal functions |
| **********************************************************************/ |
| #ifdef HAVE_PTY_H |
| static void *_window_manager(void *arg) |
| { |
| struct window_info *win_info = (struct window_info *) arg; |
| pty_winsz_t winsz; |
| ssize_t len; |
| struct winsize ws; |
| struct pollfd ufds; |
| char buf[4]; |
| |
| ufds.fd = conn_g_get_fd(win_info->conn); |
| ufds.events = POLLIN; |
| |
| while (1) { |
| if (poll(&ufds, 1, -1) <= 0) { |
| if (errno == EINTR) |
| continue; |
| error("poll(pty): %m"); |
| break; |
| } |
| if (!(ufds.revents & POLLIN)) { |
| /* ((ufds.revents & POLLHUP) || |
| * (ufds.revents & POLLERR)) */ |
| break; |
| } |
| len = slurm_read_stream(win_info->conn, buf, 4); |
| if ((len == -1) && ((errno == EINTR) || (errno == EAGAIN))) |
| continue; |
| if (len < 4) { |
| if (errno != SLURM_PROTOCOL_SOCKET_ZERO_BYTES_SENT) { |
| error("%s: read window size error: %m", |
| __func__); |
| } |
| return NULL; |
| } |
| memcpy(&winsz.cols, buf, 2); |
| memcpy(&winsz.rows, buf+2, 2); |
| ws.ws_col = ntohs(winsz.cols); |
| ws.ws_row = ntohs(winsz.rows); |
| debug("new pty size %u:%u", ws.ws_row, ws.ws_col); |
| if (ioctl(win_info->task->to_stdin, TIOCSWINSZ, &ws)) |
| error("ioctl(TIOCSWINSZ): %s", strerror(errno)); |
| if (kill(win_info->task->pid, SIGWINCH)) { |
| if (errno == ESRCH) |
| break; |
| error("kill(%d, SIGWINCH): %m", |
| (int)win_info->task->pid); |
| } |
| } |
| |
| conn_g_destroy(win_info->conn, true); |
| xfree(win_info); |
| |
| return NULL; |
| } |
| |
| static void |
| _spawn_window_manager(stepd_step_task_info_t *task, stepd_step_rec_t *step) |
| { |
| void *conn = NULL; |
| char *tls_cert = NULL; |
| char *host, *port, *rows, *cols; |
| slurm_addr_t pty_addr; |
| uint16_t port_u; |
| struct window_info *win_info; |
| |
| #if 0 |
| /* NOTE: SLURM_LAUNCH_NODE_IPADDR is not available at this point */ |
| if (!(ip_addr = getenvp(step->env, "SLURM_LAUNCH_NODE_IPADDR"))) { |
| error("SLURM_LAUNCH_NODE_IPADDR env var not set"); |
| return; |
| } |
| #endif |
| if (!(host = getenvp(step->env, "SLURM_SRUN_COMM_HOST"))) { |
| error("SLURM_SRUN_COMM_HOST env var not set"); |
| return; |
| } |
| if (!(port = getenvp(step->env, "SLURM_PTY_PORT"))) { |
| error("SLURM_PTY_PORT env var not set"); |
| return; |
| } |
| if (!(cols = getenvp(step->env, "SLURM_PTY_WIN_COL"))) |
| error("SLURM_PTY_WIN_COL env var not set"); |
| if (!(rows = getenvp(step->env, "SLURM_PTY_WIN_ROW"))) |
| error("SLURM_PTY_WIN_ROW env var not set"); |
| |
| if (rows && cols) { |
| struct winsize ws; |
| ws.ws_col = atoi(cols); |
| ws.ws_row = atoi(rows); |
| debug("init pty size %u:%u", ws.ws_row, ws.ws_col); |
| if (ioctl(task->to_stdin, TIOCSWINSZ, &ws)) |
| error("ioctl(TIOCSWINSZ): %s", strerror(errno)); |
| } |
| |
| port_u = atoi(port); |
| slurm_set_addr(&pty_addr, port_u, host); |
| |
| if (tls_enabled()) { |
| srun_info_t *srun = list_peek(step->sruns); |
| if (srun) |
| tls_cert = srun->tls_cert; |
| } |
| |
| if (!(conn = slurm_open_msg_conn(&pty_addr, tls_cert))) { |
| error("slurm_open_stream(pty_conn) %s,%u: %m", |
| host, port_u); |
| return; |
| } |
| |
| win_info = xmalloc(sizeof(struct window_info)); |
| win_info->task = task; |
| win_info->step = step; |
| win_info->conn = conn; |
| slurm_thread_create_detached(_window_manager, win_info); |
| } |
| #endif |
| |
| /********************************************************************** |
| * General functions |
| **********************************************************************/ |
| |
| /* |
| * This function sets the close-on-exec flag on all opened file descriptors. |
| * io_dup_stdio will remove the close-on-exec flags for just one task's |
| * file descriptors. |
| */ |
| static int |
| _init_task_stdio_fds(stepd_step_task_info_t *task, stepd_step_rec_t *step) |
| { |
| int file_flags = io_get_file_flags(step); |
| |
| /* |
| * Initialize stdin |
| */ |
| #ifdef HAVE_PTY_H |
| if (step->flags & LAUNCH_PTY) { |
| /* All of the stdin fails unless EVERY |
| * task gets an eio object for stdin. |
| * Its not clear why that is. */ |
| if (task->gtid == 0) { |
| debug(" stdin uses a pty object"); |
| #if HAVE_SETRESUID |
| /* |
| * openpty(3) calls grantpt(3), which sets |
| * the owner of the pty device to the *real* |
| * uid of the caller. Because of this, we must |
| * change our uid temporarily to that of the |
| * user (now the effective uid). We have to |
| * use setresuid(2) so that we keep a saved uid |
| * of root, and can regain previous permissions |
| * after the call to openpty. |
| */ |
| if (setresuid(geteuid(), geteuid(), 0) < 0) |
| error ("pre openpty: setresuid: %m"); |
| #endif |
| if (openpty(&task->to_stdin, &task->stdin_fd, |
| NULL, NULL, NULL) < 0) { |
| error("stdin openpty: %m"); |
| return SLURM_ERROR; |
| } |
| #if HAVE_SETRESUID |
| if (setresuid(0, getuid(), 0) < 0) |
| error ("post openpty: setresuid: %m"); |
| #endif |
| fd_set_close_on_exec(task->stdin_fd); |
| fd_set_close_on_exec(task->to_stdin); |
| fd_set_nonblocking(task->to_stdin); |
| _spawn_window_manager(task, step); |
| task->in = _create_task_in_eio(task->to_stdin, step); |
| eio_new_initial_obj(step->eio, (void *)task->in); |
| } else { |
| xfree(task->ifname); |
| task->ifname = xstrdup("/dev/null"); |
| task->stdin_fd = open("/dev/null", O_RDWR | O_CLOEXEC); |
| if (task->stdin_fd < 0) { |
| error("Unable to open /dev/null: %m"); |
| return SLURM_ERROR; |
| } |
| task->to_stdin = dup(task->stdin_fd); |
| fd_set_nonblocking(task->to_stdin); |
| task->in = _create_task_in_eio(task->to_stdin, step); |
| eio_new_initial_obj(step->eio, (void *)task->in); |
| } |
| } else if (task->ifname != NULL) { |
| #else |
| if (task->ifname != NULL) { |
| #endif |
| int count = 0; |
| /* open file on task's stdin */ |
| debug5(" stdin file name = %s", task->ifname); |
| do { |
| if ((task->stdin_fd = open(task->ifname, |
| (O_RDONLY | O_CLOEXEC))) |
| != -1) |
| break; |
| |
| /* "Retry-able" errors. */ |
| if (errno == EINTR) { |
| debug("%s: Could not open stdin file '%s': '%s'. Attempt [%d/%d], retrying.", |
| __func__, |
| task->ifname, |
| strerror(errno), |
| (count + 1), |
| STDIO_FILE_RETRIES); |
| count++; |
| continue; |
| } |
| |
| /* Non-"retryable" errors. */ |
| break; |
| |
| } while (count < STDIO_FILE_RETRIES); |
| if (task->stdin_fd == -1) { |
| error("Could not open stdin file %s: %m", task->ifname); |
| return SLURM_ERROR; |
| } |
| task->to_stdin = -1; /* not used */ |
| } else { |
| /* create pipe and eio object */ |
| int pin[2]; |
| |
| debug5(" stdin uses an eio object"); |
| if (pipe2(pin, O_CLOEXEC) < 0) { |
| error("stdin pipe: %m"); |
| return SLURM_ERROR; |
| } |
| task->stdin_fd = pin[0]; |
| task->to_stdin = pin[1]; |
| fd_set_nonblocking(task->to_stdin); |
| task->in = _create_task_in_eio(task->to_stdin, step); |
| eio_new_initial_obj(step->eio, (void *)task->in); |
| } |
| |
| /* |
| * Initialize stdout |
| */ |
| #ifdef HAVE_PTY_H |
| if (step->flags & LAUNCH_PTY) { |
| if (task->gtid == 0) { |
| task->stdout_fd = dup(task->stdin_fd); |
| fd_set_close_on_exec(task->stdout_fd); |
| task->from_stdout = dup(task->to_stdin); |
| fd_set_close_on_exec(task->from_stdout); |
| fd_set_nonblocking(task->from_stdout); |
| task->out = _create_task_out_eio(task->from_stdout, |
| SLURM_IO_STDOUT, step, task); |
| list_append(step->stdout_eio_objs, (void *)task->out); |
| eio_new_initial_obj(step->eio, (void *)task->out); |
| } else { |
| xfree(task->ofname); |
| task->ofname = xstrdup("/dev/null"); |
| task->stdout_fd = open("/dev/null", O_RDWR, O_CLOEXEC); |
| task->from_stdout = -1; /* not used */ |
| } |
| } else if ((task->ofname != NULL) && |
| (((step->flags & LAUNCH_LABEL_IO) == 0) || |
| (xstrcmp(task->ofname, "/dev/null") == 0))) { |
| #else |
| if (task->ofname != NULL && |
| (((step->flags & LAUNCH_LABEL_IO) == 0) || |
| xstrcmp(task->ofname, "/dev/null") == 0)) { |
| #endif |
| int count = 0, mkdir_rc; |
| bool tried_mkdir = false; |
| /* open file on task's stdout */ |
| debug5(" stdout file name = %s", task->ofname); |
| do { |
| if ((task->stdout_fd = open(task->ofname, |
| file_flags | O_CLOEXEC, |
| 0666)) != -1) |
| break; |
| |
| /* "Retry-able" errors. */ |
| if (errno == EINTR) { |
| debug("%s: Could not open stdout file '%s': '%s'. Attempt [%d/%d], retrying.", |
| __func__, |
| task->ofname, |
| strerror(count + 1), |
| (count + 1), |
| STDIO_FILE_RETRIES); |
| count++; |
| continue; |
| } |
| |
| if (errno == ENOENT && !tried_mkdir) { |
| mkdir_rc = mkdirpath(task->ofname, 0755, false); |
| tried_mkdir = true; |
| if (mkdir_rc == SLURM_SUCCESS) { |
| debug("%s: Could not open stdout file '%s': '%s'. Retrying after successful path creation.", |
| __func__, |
| task->ofname, |
| strerror(ENOENT)); |
| continue; |
| } else { |
| error("%s: Could not open stdout file '%s': '%s'. Recursive path creation failed: '%s'.", |
| __func__, |
| task->ofname, |
| strerror(ENOENT), |
| strerror(mkdir_rc)); |
| return SLURM_ERROR; |
| } |
| } |
| |
| /* Non-"retryable" errors. */ |
| break; |
| |
| } while (count < STDIO_FILE_RETRIES); |
| if (task->stdout_fd == -1) { |
| error("Could not open stdout file %s: %m", |
| task->ofname); |
| return SLURM_ERROR; |
| } |
| task->from_stdout = -1; /* not used */ |
| } else { |
| /* create pipe and eio object */ |
| int pout[2]; |
| #if HAVE_PTY_H |
| struct termios tio; |
| if (!(step->flags & LAUNCH_BUFFERED_IO)) { |
| #if HAVE_SETRESUID |
| if (setresuid(geteuid(), geteuid(), 0) < 0) |
| error("%s: %u setresuid() %m", |
| __func__, geteuid()); |
| #endif |
| if (openpty(pout, pout + 1, NULL, NULL, NULL) < 0) { |
| error("%s: stdout openpty: %m", __func__); |
| return SLURM_ERROR; |
| } |
| memset(&tio, 0, sizeof(tio)); |
| if (tcgetattr(pout[1], &tio) == 0) { |
| tio.c_oflag &= ~OPOST; |
| if (tcsetattr(pout[1], 0, &tio) != 0) |
| error("%s: tcsetattr: %m", __func__); |
| } |
| #if HAVE_SETRESUID |
| if (setresuid(0, getuid(), 0) < 0) |
| error("%s 0 setresuid() %m", __func__); |
| #endif |
| } else { |
| debug5(" stdout uses an eio object"); |
| if (pipe(pout) < 0) { |
| error("stdout pipe: %m"); |
| return SLURM_ERROR; |
| } |
| } |
| #else |
| debug5(" stdout uses an eio object"); |
| if (pipe(pout) < 0) { |
| error("stdout pipe: %m"); |
| return SLURM_ERROR; |
| } |
| #endif |
| task->stdout_fd = pout[1]; |
| fd_set_close_on_exec(task->stdout_fd); |
| task->from_stdout = pout[0]; |
| fd_set_close_on_exec(task->from_stdout); |
| fd_set_nonblocking(task->from_stdout); |
| task->out = _create_task_out_eio(task->from_stdout, |
| SLURM_IO_STDOUT, step, task); |
| list_append(step->stdout_eio_objs, (void *)task->out); |
| eio_new_initial_obj(step->eio, (void *)task->out); |
| } |
| |
| /* |
| * Initialize stderr |
| */ |
| #ifdef HAVE_PTY_H |
| if (step->flags & LAUNCH_PTY) { |
| if (task->gtid == 0) { |
| /* Make a file descriptor for the task to write to, but |
| don't make a separate one read from, because in pty |
| mode we can't distinguish between stdout and stderr |
| coming from the remote shell. Both streams from the |
| shell will go to task->stdout_fd, which is okay in |
| pty mode because any output routed through the stepd |
| will be displayed. */ |
| task->stderr_fd = dup(task->stdin_fd); |
| fd_set_close_on_exec(task->stderr_fd); |
| task->from_stderr = -1; |
| } else { |
| xfree(task->efname); |
| task->efname = xstrdup("/dev/null"); |
| task->stderr_fd = open("/dev/null", O_RDWR | O_CLOEXEC); |
| task->from_stderr = -1; /* not used */ |
| } |
| |
| } else if ((task->efname != NULL) && |
| (((step->flags & LAUNCH_LABEL_IO) == 0) || |
| (xstrcmp(task->efname, "/dev/null") == 0))) { |
| #else |
| if ((task->efname != NULL) && |
| (((step->flags & LAUNCH_LABEL_IO) == 0) || |
| (xstrcmp(task->efname, "/dev/null") == 0))) { |
| #endif |
| int count = 0, mkdir_rc; |
| bool tried_mkdir = false; |
| /* open file on task's stdout */ |
| debug5(" stderr file name = %s", task->efname); |
| do { |
| if ((task->stderr_fd = open(task->efname, |
| file_flags | O_CLOEXEC, |
| 0666)) != -1) |
| break; |
| |
| /* "Retry-able" errors. */ |
| if (errno == EINTR) { |
| debug("%s: Could not open stderr file '%s': '%s'. Attempt [%d/%d], retrying.", |
| __func__, |
| task->efname, |
| strerror(errno), |
| (count + 1), |
| STDIO_FILE_RETRIES); |
| count++; |
| continue; |
| } |
| |
| if (errno == ENOENT && !tried_mkdir) { |
| mkdir_rc = mkdirpath(task->efname, 0755, false); |
| tried_mkdir = true; |
| if (mkdir_rc == SLURM_SUCCESS) { |
| debug("%s: Could not open stderr file '%s': '%s'. Retrying after successful path creation.", |
| __func__, |
| task->efname, |
| strerror(ENOENT)); |
| continue; |
| } else { |
| error("%s: Could not open stderr file '%s': '%s'. Recursive path creation failed: '%s'.", |
| __func__, |
| task->efname, |
| strerror(ENOENT), |
| strerror(mkdir_rc)); |
| return SLURM_ERROR; |
| } |
| } |
| |
| /* Non-"retryable" errors. */ |
| break; |
| |
| } while (count < STDIO_FILE_RETRIES); |
| if (task->stderr_fd == -1) { |
| error("Could not open stderr file %s: %m", |
| task->efname); |
| return SLURM_ERROR; |
| } |
| task->from_stderr = -1; /* not used */ |
| } else { |
| /* create pipe and eio object */ |
| int perr[2]; |
| debug5(" stderr uses an eio object"); |
| if (pipe(perr) < 0) { |
| error("stderr pipe: %m"); |
| return SLURM_ERROR; |
| } |
| task->stderr_fd = perr[1]; |
| fd_set_close_on_exec(task->stderr_fd); |
| task->from_stderr = perr[0]; |
| fd_set_close_on_exec(task->from_stderr); |
| fd_set_nonblocking(task->from_stderr); |
| task->err = _create_task_out_eio(task->from_stderr, |
| SLURM_IO_STDERR, step, task); |
| list_append(step->stderr_eio_objs, (void *)task->err); |
| eio_new_initial_obj(step->eio, (void *)task->err); |
| } |
| |
| return SLURM_SUCCESS; |
| } |
| |
| int |
| io_init_tasks_stdio(stepd_step_rec_t *step) |
| { |
| int i, rc = SLURM_SUCCESS, tmprc; |
| |
| for (i = 0; i < step->node_tasks; i++) { |
| tmprc = _init_task_stdio_fds(step->task[i], step); |
| if (tmprc != SLURM_SUCCESS) |
| rc = tmprc; |
| } |
| |
| return rc; |
| } |
| |
| extern void io_thread_start(stepd_step_rec_t *step) |
| { |
| slurm_mutex_lock(&step->io_mutex); |
| slurm_thread_create_detached(_io_thr, step); |
| step->io_running = true; |
| slurm_mutex_unlock(&step->io_mutex); |
| } |
| |
| static void _shrink_msg_cache(list_t *cache, stepd_step_rec_t *step) |
| { |
| struct io_buf *msg; |
| int over = 0; |
| int count; |
| int i; |
| |
| count = list_count(cache); |
| if (count > STDIO_MAX_MSG_CACHE) |
| over = count - STDIO_MAX_MSG_CACHE; |
| |
| for (i = 0; i < over; i++) { |
| msg = list_dequeue(cache); |
| /* FIXME - following call MIGHT lead to too much recursion */ |
| _free_outgoing_msg(msg, step); |
| } |
| } |
| |
| |
| |
| static int |
| _send_connection_okay_response(stepd_step_rec_t *step) |
| { |
| eio_obj_t *eio; |
| list_itr_t *clients; |
| struct io_buf *msg; |
| struct client_io_info *client; |
| |
| msg = _build_connection_okay_message(step); |
| if (!msg) { |
| error( "Could not send connection okay message because of " |
| "lack of buffer space."); |
| return SLURM_ERROR; |
| } |
| |
| clients = list_iterator_create(step->clients); |
| while ((eio = list_next(clients))) { |
| client = (struct client_io_info *)eio->arg; |
| if (client->out_eof || client->is_local_file) |
| continue; |
| |
| debug5("Sent connection okay message"); |
| xassert(client->magic == CLIENT_IO_MAGIC); |
| list_enqueue(client->msg_queue, msg); |
| msg->ref_count++; |
| } |
| list_iterator_destroy(clients); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| |
| |
| static struct io_buf * |
| _build_connection_okay_message(stepd_step_rec_t *step) |
| { |
| struct io_buf *msg; |
| buf_t *packbuf; |
| io_hdr_t header; |
| |
| if (_outgoing_buf_free(step)) { |
| msg = list_dequeue(step->free_outgoing); |
| } else { |
| return NULL; |
| } |
| |
| header.type = SLURM_IO_CONNECTION_TEST; |
| header.ltaskid = 0; /* Unused */ |
| header.gtaskid = 0; /* Unused */ |
| header.length = 0; |
| |
| packbuf = create_buf(msg->data, IO_HDR_PACKET_BYTES); |
| if (!packbuf) { |
| fatal("Failure to allocate memory for a message header"); |
| return msg; /* Fix for CLANG false positive error */ |
| } |
| io_hdr_pack(&header, packbuf); |
| msg->length = IO_HDR_PACKET_BYTES; |
| msg->ref_count = 0; /* make certain it is initialized */ |
| |
| /* free packbuf, but not the memory to which it points */ |
| packbuf->head = NULL; /* CLANG false positive bug here */ |
| FREE_NULL_BUFFER(packbuf); |
| |
| return msg; |
| } |
| |
| |
| |
| static void |
| _route_msg_task_to_client(eio_obj_t *obj) |
| { |
| struct task_read_info *out = (struct task_read_info *)obj->arg; |
| struct client_io_info *client; |
| struct io_buf *msg = NULL; |
| eio_obj_t *eio; |
| list_itr_t *clients; |
| |
| /* Pack task output into messages for transfer to a client */ |
| while (cbuf_used(out->buf) > 0 |
| && _outgoing_buf_free(out->step)) { |
| debug5("cbuf_used = %d", cbuf_used(out->buf)); |
| msg = _task_build_message(out, out->step, out->buf); |
| if (msg == NULL) |
| return; |
| |
| /* Add message to the msg_queue of all clients */ |
| clients = list_iterator_create(out->step->clients); |
| while ((eio = list_next(clients))) { |
| client = (struct client_io_info *)eio->arg; |
| if (client->out_eof == true) |
| continue; |
| |
| /* Some clients only take certain I/O streams */ |
| if (out->type==SLURM_IO_STDOUT) { |
| if (client->ltaskid_stdout != -1 && |
| client->ltaskid_stdout != out->ltaskid) |
| continue; |
| } |
| if (out->type==SLURM_IO_STDERR) { |
| if (client->ltaskid_stderr != -1 && |
| client->ltaskid_stderr != out->ltaskid) |
| continue; |
| } |
| |
| debug5("======================== Enqueued message"); |
| xassert(client->magic == CLIENT_IO_MAGIC); |
| list_enqueue(client->msg_queue, msg); |
| msg->ref_count++; |
| } |
| list_iterator_destroy(clients); |
| |
| /* Update the outgoing message cache */ |
| list_enqueue(out->step->outgoing_cache, msg); |
| msg->ref_count++; |
| _shrink_msg_cache(out->step->outgoing_cache, out->step); |
| } |
| } |
| |
| static void |
| _free_incoming_msg(struct io_buf *msg, stepd_step_rec_t *step) |
| { |
| msg->ref_count--; |
| if (msg->ref_count == 0) { |
| /* Put the message back on the free list */ |
| list_enqueue(step->free_incoming, msg); |
| |
| /* Kick the event IO engine */ |
| eio_signal_wakeup(step->eio); |
| } |
| } |
| |
| static void |
| _free_outgoing_msg(struct io_buf *msg, stepd_step_rec_t *step) |
| { |
| int i; |
| |
| msg->ref_count--; |
| if (msg->ref_count == 0) { |
| /* Put the message back on the free list */ |
| list_enqueue(step->free_outgoing, msg); |
| |
| /* Try packing messages from tasks' output cbufs */ |
| if (step->task == NULL) |
| return; |
| for (i = 0; i < step->node_tasks; i++) { |
| if (step->task[i]->err != NULL) { |
| _route_msg_task_to_client(step->task[i]->err); |
| if (!_outgoing_buf_free(step)) |
| break; |
| } |
| if (step->task[i]->out != NULL) { |
| _route_msg_task_to_client(step->task[i]->out); |
| if (!_outgoing_buf_free(step)) |
| break; |
| } |
| } |
| /* Kick the event IO engine */ |
| eio_signal_wakeup(step->eio); |
| } |
| } |
| |
| static void _free_all_outgoing_msgs(list_t *msg_queue, stepd_step_rec_t *step) |
| { |
| list_itr_t *msgs; |
| struct io_buf *msg; |
| |
| msgs = list_iterator_create(msg_queue); |
| while((msg = list_next(msgs))) { |
| _free_outgoing_msg(msg, step); |
| } |
| list_iterator_destroy(msgs); |
| } |
| |
| /* Close I/O file descriptors created by slurmstepd. The connections have |
| * all been moved to the spawned tasks stdin/out/err file descriptors. */ |
| extern void |
| io_close_task_fds(stepd_step_rec_t *step) |
| { |
| int i; |
| |
| for (i = 0; i < step->node_tasks; i++) { |
| close(step->task[i]->stdin_fd); |
| close(step->task[i]->stdout_fd); |
| close(step->task[i]->stderr_fd); |
| } |
| } |
| |
| void |
| io_close_all(stepd_step_rec_t *step) |
| { |
| int devnull; |
| #if 0 |
| int i; |
| for (i = 0; i < step->node_tasks; i++) |
| _io_finalize(step->task[i]); |
| #endif |
| |
| /* No more debug info will be received by client after this point |
| */ |
| debug("Closing debug channel"); |
| |
| /* |
| * Send stderr to /dev/null since debug channel is closing |
| * and log facility may still try to write to stderr. |
| */ |
| if ((devnull = open("/dev/null", O_RDWR)) < 0) { |
| error("Could not open /dev/null: %m"); |
| } else { |
| if (dup2(devnull, STDERR_FILENO) < 0) |
| error("Unable to dup /dev/null onto stderr"); |
| (void) close(devnull); |
| } |
| |
| /* Signal IO thread to close appropriate |
| * client connections |
| */ |
| eio_signal_shutdown(step->eio); |
| } |
| |
| void |
| io_close_local_fds(stepd_step_rec_t *step) |
| { |
| list_itr_t *clients; |
| eio_obj_t *eio; |
| int rc; |
| struct client_io_info *client; |
| |
| if (step == NULL || step->clients == NULL) |
| return; |
| |
| clients = list_iterator_create(step->clients); |
| while((eio = list_next(clients))) { |
| client = (struct client_io_info *)eio->arg; |
| if (client->is_local_file) { |
| if (eio->fd >= 0) { |
| do { |
| rc = close(eio->fd); |
| } while (rc == -1 && errno == EINTR); |
| eio->fd = -1; |
| } |
| } |
| } |
| list_iterator_destroy(clients); |
| } |
| |
| |
| |
| static void * |
| _io_thr(void *arg) |
| { |
| stepd_step_rec_t *step = (stepd_step_rec_t *) arg; |
| int rc; |
| |
| debug("IO handler started pid=%lu", (unsigned long) getpid()); |
| rc = eio_handle_mainloop(step->eio); |
| debug("IO handler exited, rc=%d", rc); |
| slurm_mutex_lock(&step->io_mutex); |
| step->io_running = false; |
| slurm_cond_broadcast(&step->io_cond); |
| slurm_mutex_unlock(&step->io_mutex); |
| return (void *)1; |
| } |
| |
| /* |
| * Add a client to the step's client list that will write stdout and/or |
| * stderr from the slurmstepd. The slurmstepd handles the write when |
| * a file is created per node or per task, and the output needs to be |
| * modified in some way, like labelling lines with the task number. |
| */ |
| int |
| io_create_local_client(const char *filename, int file_flags, |
| stepd_step_rec_t *step, bool labelio, |
| int stdout_tasks, int stderr_tasks) |
| { |
| int fd = -1; |
| struct client_io_info *client; |
| eio_obj_t *obj; |
| int tmp; |
| |
| fd = open(filename, file_flags | O_CLOEXEC, 0666); |
| if (fd == -1) { |
| return ESLURMD_IO_ERROR; |
| } |
| |
| /* Now set up the eio object */ |
| client = xmalloc(sizeof(*client)); |
| client->magic = CLIENT_IO_MAGIC; |
| client->step = step; |
| client->msg_queue = list_create(NULL); /* FIXME - destructor */ |
| |
| client->ltaskid_stdout = stdout_tasks; |
| client->ltaskid_stderr = stderr_tasks; |
| client->labelio = labelio; |
| client->is_local_file = true; |
| |
| client->taskid_width = 1; |
| tmp = step->node_tasks - 1; |
| while ((tmp /= 10) > 0) |
| client->taskid_width++; |
| |
| |
| obj = eio_obj_create(fd, &local_file_ops, (void *)client); |
| list_append(step->clients, (void *)obj); |
| eio_new_initial_obj(step->eio, (void *)obj); |
| debug5("Now handling %d IO Client object(s)", list_count(step->clients)); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /* |
| * Create the initial TCP connection back to a waiting client (e.g. srun). |
| * |
| * Since this is the first client connection and the IO engine has not |
| * yet started, we initialize the msg_queue as an empty list and |
| * directly add the eio_obj_t to the eio handle with eio_new_initial_obj. |
| * |
| * We assume that if the port is zero the client does not wish us to connect |
| * an IO stream. |
| */ |
| int |
| io_initial_client_connect(srun_info_t *srun, stepd_step_rec_t *step, |
| int stdout_tasks, int stderr_tasks) |
| { |
| int sock = -1; |
| struct client_io_info *client; |
| eio_obj_t *obj; |
| void *conn = NULL; |
| |
| debug4 ("adding IO connection (logical node rank %d)", step->nodeid); |
| |
| if (!slurm_addr_is_unspec(&srun->ioaddr)) { |
| if (slurm_get_port(&srun->ioaddr) == 0) { |
| debug3("No IO connection requested"); |
| return SLURM_SUCCESS; |
| } |
| debug4("connecting IO back to %pA", &srun->ioaddr); |
| } |
| |
| if (!(conn = slurm_open_msg_conn(&srun->ioaddr, srun->tls_cert))) { |
| error("connect io: %m"); |
| /* XXX retry or silently fail? |
| * fail for now. |
| */ |
| return SLURM_ERROR; |
| } |
| |
| sock = conn_g_get_fd(conn); |
| |
| fd_set_blocking(sock); /* just in case... */ |
| _send_io_init_msg(sock, conn, srun, step, true); |
| |
| debug5(" back from _send_io_init_msg"); |
| fd_set_nonblocking(sock); |
| |
| /* Now set up the eio object */ |
| client = xmalloc(sizeof(*client)); |
| client->magic = CLIENT_IO_MAGIC; |
| client->step = step; |
| client->msg_queue = list_create(NULL); /* FIXME - destructor */ |
| |
| client->ltaskid_stdout = stdout_tasks; |
| client->ltaskid_stderr = stderr_tasks; |
| client->labelio = false; |
| client->taskid_width = 0; |
| client->is_local_file = false; |
| |
| obj = eio_obj_create(sock, &client_ops, (void *)client); |
| obj->conn = conn; |
| list_append(step->clients, (void *)obj); |
| eio_new_initial_obj(step->eio, (void *)obj); |
| debug5("Now handling %d IO Client object(s)", |
| list_count(step->clients)); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /* |
| * Initiate a TCP connection back to a waiting client (e.g. srun). |
| * |
| * Create a new eio client object and wake up the eio engine so that |
| * it can see the new object. |
| */ |
| int |
| io_client_connect(srun_info_t *srun, stepd_step_rec_t *step) |
| { |
| int sock = -1; |
| struct client_io_info *client; |
| eio_obj_t *obj; |
| void *conn = NULL; |
| |
| debug4 ("adding IO connection (logical node rank %d)", step->nodeid); |
| |
| if (!slurm_addr_is_unspec(&srun->ioaddr)) { |
| debug4("connecting IO back to %pA", &srun->ioaddr); |
| } |
| |
| if (!(conn = slurm_open_msg_conn(&srun->ioaddr, srun->tls_cert))) { |
| error("connect io: %m"); |
| /* XXX retry or silently fail? |
| * fail for now. |
| */ |
| return SLURM_ERROR; |
| } |
| |
| sock = conn_g_get_fd(conn); |
| |
| fd_set_blocking(sock); /* just in case... */ |
| _send_io_init_msg(sock, conn, srun, step, false); |
| |
| debug5(" back from _send_io_init_msg"); |
| fd_set_nonblocking(sock); |
| |
| /* Now set up the eio object */ |
| client = xmalloc(sizeof(*client)); |
| client->magic = CLIENT_IO_MAGIC; |
| client->step = step; |
| client->msg_queue = NULL; /* initialized in _client_writable */ |
| |
| client->ltaskid_stdout = -1; /* accept from all tasks */ |
| client->ltaskid_stderr = -1; /* accept from all tasks */ |
| client->labelio = false; |
| client->taskid_width = 0; |
| client->is_local_file = false; |
| |
| /* client object adds itself to step->clients in _client_writable */ |
| |
| obj = eio_obj_create(sock, &client_ops, (void *)client); |
| obj->conn = conn; |
| eio_new_obj(step->eio, (void *)obj); |
| |
| debug5("New IO Client object added"); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| static int _send_io_init_msg(int sock, void *conn, srun_info_t *srun, |
| stepd_step_rec_t *step, bool init) |
| { |
| io_init_msg_t msg; |
| |
| msg.io_key = xstrdup(srun->key); |
| msg.nodeid = step->nodeid; |
| msg.version = srun->protocol_version; |
| |
| /* |
| * The initial message does not need the node_offset it is needed for |
| * sattach |
| */ |
| if (!init && (step->step_id.step_het_comp != NO_VAL)) |
| msg.nodeid += step->het_job_node_offset; |
| |
| if (step->stdout_eio_objs == NULL) |
| msg.stdout_objs = 0; |
| else |
| msg.stdout_objs = list_count(step->stdout_eio_objs); |
| if (step->stderr_eio_objs == NULL) |
| msg.stderr_objs = 0; |
| else |
| msg.stderr_objs = list_count(step->stderr_eio_objs); |
| |
| if (io_init_msg_write_to_fd(sock, conn, &msg) != SLURM_SUCCESS) { |
| error("Couldn't sent slurm_io_init_msg"); |
| xfree(msg.io_key); |
| return SLURM_ERROR; |
| } |
| |
| xfree(msg.io_key); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| /* |
| * dup the appropriate file descriptors onto the task's |
| * stdin, stdout, and stderr. |
| * |
| * Close the server's end of the stdio pipes. |
| */ |
| int |
| io_dup_stdio(stepd_step_task_info_t *t) |
| { |
| if (dup2(t->stdin_fd, STDIN_FILENO ) < 0) { |
| error("dup2(stdin): %m"); |
| return SLURM_ERROR; |
| } |
| fd_set_noclose_on_exec(STDIN_FILENO); |
| |
| if (dup2(t->stdout_fd, STDOUT_FILENO) < 0) { |
| error("dup2(stdout): %m"); |
| return SLURM_ERROR; |
| } |
| fd_set_noclose_on_exec(STDOUT_FILENO); |
| |
| if (dup2(t->stderr_fd, STDERR_FILENO) < 0) { |
| error("dup2(stderr): %m"); |
| return SLURM_ERROR; |
| } |
| fd_set_noclose_on_exec(STDERR_FILENO); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| static void |
| _send_eof_msg(struct task_read_info *out) |
| { |
| struct client_io_info *client; |
| struct io_buf *msg = NULL; |
| eio_obj_t *eio; |
| list_itr_t *clients; |
| io_hdr_t header; |
| buf_t *packbuf; |
| |
| debug4("Entering _send_eof_msg"); |
| out->eof_msg_sent = true; |
| |
| if (_outgoing_buf_free(out->step)) { |
| msg = list_dequeue(out->step->free_outgoing); |
| } else { |
| /* eof message must be allowed to allocate new memory |
| because _task_readable() will return "true" until |
| the eof message is enqueued. For instance, if |
| a poll returns POLLHUP on the incoming task pipe, |
| put there are no outgoing message buffers available, |
| the slurmstepd will start spinning. */ |
| msg = _alloc_io_buf(); |
| } |
| |
| header.type = out->type; |
| header.ltaskid = out->ltaskid; |
| header.gtaskid = out->gtaskid; |
| header.length = 0; /* eof */ |
| |
| packbuf = create_buf(msg->data, IO_HDR_PACKET_BYTES); |
| if (!packbuf) { |
| fatal("Failure to allocate memory for a message header"); |
| return; /* Fix for CLANG false positive error */ |
| } |
| |
| io_hdr_pack(&header, packbuf); |
| msg->length = IO_HDR_PACKET_BYTES + header.length; |
| msg->ref_count = 0; /* make certain it is initialized */ |
| |
| /* free packbuf, but not the memory to which it points */ |
| packbuf->head = NULL; /* CLANG false positive bug here */ |
| FREE_NULL_BUFFER(packbuf); |
| |
| /* Add eof message to the msg_queue of all clients */ |
| clients = list_iterator_create(out->step->clients); |
| while ((eio = list_next(clients))) { |
| client = (struct client_io_info *)eio->arg; |
| debug5("======================== Enqueued eof message"); |
| xassert(client->magic == CLIENT_IO_MAGIC); |
| |
| /* Send eof message to all clients */ |
| list_enqueue(client->msg_queue, msg); |
| msg->ref_count++; |
| } |
| list_iterator_destroy(clients); |
| if (msg->ref_count == 0) |
| _free_io_buf(msg); |
| |
| debug4("Leaving _send_eof_msg"); |
| } |
| |
| |
| |
| static struct io_buf *_task_build_message(struct task_read_info *out, |
| stepd_step_rec_t *step, cbuf_t *cbuf) |
| { |
| struct io_buf *msg; |
| char *ptr; |
| buf_t *packbuf; |
| bool must_truncate = false; |
| int avail; |
| io_hdr_t header; |
| int n; |
| bool buffered_stdio = step->flags & LAUNCH_BUFFERED_IO; |
| |
| debug4("%s: Entering...", __func__); |
| |
| if (_outgoing_buf_free(step)) { |
| msg = list_dequeue(step->free_outgoing); |
| } else { |
| return NULL; |
| } |
| |
| ptr = msg->data + IO_HDR_PACKET_BYTES; |
| |
| if (buffered_stdio) { |
| avail = cbuf_peek_line(cbuf, ptr, SLURM_IO_MAX_MSG_LEN, 1); |
| if (avail >= SLURM_IO_MAX_MSG_LEN) |
| must_truncate = true; |
| else if (avail == 0 && cbuf_used(cbuf) >= SLURM_IO_MAX_MSG_LEN) |
| must_truncate = true; |
| } |
| |
| debug5("%s: buffered_stdio is %s", __func__, |
| buffered_stdio ? "true" : "false"); |
| debug5("%s: must_truncate is %s", __func__, |
| must_truncate ? "true" : "false"); |
| |
| /* |
| * If eof has been read from a tasks stdout or stderr, we need to |
| * ignore normal line buffering and send the buffer immediately. |
| * Hence the "|| out->eof". |
| */ |
| if (must_truncate || !buffered_stdio || out->eof) { |
| n = cbuf_read(cbuf, ptr, SLURM_IO_MAX_MSG_LEN); |
| } else { |
| n = cbuf_read_line(cbuf, ptr, SLURM_IO_MAX_MSG_LEN, -1); |
| if (n == 0) { |
| debug5(" partial line in buffer, ignoring"); |
| debug4("Leaving _task_build_message"); |
| list_enqueue(step->free_outgoing, msg); |
| return NULL; |
| } |
| } |
| |
| header.type = out->type; |
| header.ltaskid = out->ltaskid; |
| header.gtaskid = out->gtaskid; |
| header.length = n; |
| |
| debug4("%s: header.length = %d", __func__, n); |
| packbuf = create_buf(msg->data, IO_HDR_PACKET_BYTES); |
| if (!packbuf) { |
| fatal("Failure to allocate memory for a message header"); |
| return msg; /* Fix for CLANG false positive error */ |
| } |
| io_hdr_pack(&header, packbuf); |
| msg->length = IO_HDR_PACKET_BYTES + header.length; |
| msg->ref_count = 0; /* make certain it is initialized */ |
| |
| /* free packbuf, but not the memory to which it points */ |
| packbuf->head = NULL; /* CLANG false positive bug here */ |
| FREE_NULL_BUFFER(packbuf); |
| |
| debug4("%s: Leaving", __func__); |
| return msg; |
| } |
| |
| static struct io_buf *_alloc_io_buf(void) |
| { |
| struct io_buf *buf = xmalloc(sizeof(*buf)); |
| |
| buf->ref_count = 0; |
| buf->length = 0; |
| /* The following "+ 1" is just temporary so I can stick a \0 at |
| the end and do a printf of the data pointer */ |
| buf->data = xmalloc(SLURM_IO_MAX_MSG_LEN + IO_HDR_PACKET_BYTES + 1); |
| |
| return buf; |
| } |
| |
| static void _free_io_buf(struct io_buf *buf) |
| { |
| if (buf) { |
| if (buf->data) |
| xfree(buf->data); |
| xfree(buf); |
| } |
| } |
| |
| /* This just determines if there's space to hold more of the stdin stream */ |
| static bool |
| _incoming_buf_free(stepd_step_rec_t *step) |
| { |
| struct io_buf *buf; |
| |
| if (list_count(step->free_incoming) > 0) { |
| return true; |
| } else if (step->incoming_count < STDIO_MAX_FREE_BUF) { |
| buf = _alloc_io_buf(); |
| list_enqueue(step->free_incoming, buf); |
| step->incoming_count++; |
| return true; |
| } |
| |
| return false; |
| } |
| |
| static bool |
| _outgoing_buf_free(stepd_step_rec_t *step) |
| { |
| struct io_buf *buf; |
| |
| if (list_count(step->free_outgoing) > 0) { |
| return true; |
| } else if (step->outgoing_count < STDIO_MAX_FREE_BUF) { |
| buf = _alloc_io_buf(); |
| list_enqueue(step->free_outgoing, buf); |
| step->outgoing_count++; |
| return true; |
| } |
| |
| return false; |
| } |
| |
| void |
| io_find_filename_pattern( stepd_step_rec_t *step, |
| slurmd_filename_pattern_t *outpattern, |
| slurmd_filename_pattern_t *errpattern, |
| bool *same_out_err_files ) |
| { |
| int ii, jj; |
| int of_num_null = 0, ef_num_null = 0; |
| int of_num_devnull = 0, ef_num_devnull = 0; |
| int of_lastnull = -1, ef_lastnull = -1; |
| bool of_all_same = true, ef_all_same = true; |
| bool of_all_unique = true, ef_all_unique = true; |
| |
| *outpattern = SLURMD_UNKNOWN; |
| *errpattern = SLURMD_UNKNOWN; |
| *same_out_err_files = false; |
| |
| for (ii = 0; ii < step->node_tasks; ii++) { |
| if (step->task[ii]->ofname == NULL) { |
| of_num_null++; |
| of_lastnull = ii; |
| } else if (xstrcmp(step->task[ii]->ofname, "/dev/null")==0) { |
| of_num_devnull++; |
| } |
| |
| if (step->task[ii]->efname == NULL) { |
| ef_num_null++; |
| ef_lastnull = ii; |
| } else if (xstrcmp(step->task[ii]->efname, "/dev/null")==0) { |
| ef_num_devnull++; |
| } |
| } |
| if (of_num_null == step->node_tasks) |
| *outpattern = SLURMD_ALL_NULL; |
| |
| if (ef_num_null == step->node_tasks) |
| *errpattern = SLURMD_ALL_NULL; |
| |
| if (of_num_null == 1 && of_num_devnull == step->node_tasks-1) |
| *outpattern = SLURMD_ONE_NULL; |
| |
| if (ef_num_null == 1 && ef_num_devnull == step->node_tasks-1) |
| *errpattern = SLURMD_ONE_NULL; |
| |
| if (*outpattern == SLURMD_ALL_NULL && *errpattern == SLURMD_ALL_NULL) |
| *same_out_err_files = true; |
| |
| if (*outpattern == SLURMD_ONE_NULL && *errpattern == SLURMD_ONE_NULL && |
| of_lastnull == ef_lastnull) |
| *same_out_err_files = true; |
| |
| if (*outpattern != SLURMD_UNKNOWN && *errpattern != SLURMD_UNKNOWN) |
| return; |
| |
| for (ii = 1; ii < step->node_tasks; ii++) { |
| if (!step->task[ii]->ofname || !step->task[0]->ofname || |
| xstrcmp(step->task[ii]->ofname, step->task[0]->ofname) != 0) |
| of_all_same = false; |
| |
| if (!step->task[ii]->efname || !step->task[0]->efname || |
| xstrcmp(step->task[ii]->efname, step->task[0]->efname) != 0) |
| ef_all_same = false; |
| } |
| |
| if (of_all_same && *outpattern == SLURMD_UNKNOWN) |
| *outpattern = SLURMD_ALL_SAME; |
| |
| if (ef_all_same && *errpattern == SLURMD_UNKNOWN) |
| *errpattern = SLURMD_ALL_SAME; |
| |
| if (step->task[0]->ofname && step->task[0]->efname && |
| xstrcmp(step->task[0]->ofname, step->task[0]->efname)==0) |
| *same_out_err_files = true; |
| |
| if (*outpattern != SLURMD_UNKNOWN && *errpattern != SLURMD_UNKNOWN) |
| return; |
| |
| for (ii = 0; ii < step->node_tasks-1; ii++) { |
| for (jj = ii+1; jj < step->node_tasks; jj++) { |
| |
| if (!step->task[ii]->ofname || |
| !step->task[jj]->ofname || |
| xstrcmp(step->task[ii]->ofname, |
| step->task[jj]->ofname) == 0) |
| of_all_unique = false; |
| |
| if (!step->task[ii]->efname || |
| !step->task[jj]->efname || |
| xstrcmp(step->task[ii]->efname, |
| step->task[jj]->efname) == 0) |
| ef_all_unique = false; |
| } |
| } |
| |
| if (of_all_unique) |
| *outpattern = SLURMD_ALL_UNIQUE; |
| |
| if (ef_all_unique) |
| *errpattern = SLURMD_ALL_UNIQUE; |
| |
| if (of_all_unique && ef_all_unique) { |
| *same_out_err_files = true; |
| for (ii = 0; ii < step->node_tasks; ii++) { |
| if (step->task[ii]->ofname && |
| step->task[ii]->efname && |
| xstrcmp(step->task[ii]->ofname, |
| step->task[ii]->efname) != 0) { |
| *same_out_err_files = false; |
| break; |
| } |
| } |
| } |
| } |
| |
| |
| int |
| io_get_file_flags(stepd_step_rec_t *step) |
| { |
| int file_flags; |
| |
| /* set files for opening stdout/err */ |
| if (step->open_mode == OPEN_MODE_APPEND) |
| file_flags = O_CREAT|O_WRONLY|O_APPEND; |
| else if (step->open_mode == OPEN_MODE_TRUNCATE) |
| file_flags = O_CREAT|O_WRONLY|O_APPEND|O_TRUNC; |
| else if (slurm_conf.job_file_append) |
| file_flags = O_CREAT|O_WRONLY|O_APPEND; |
| else |
| file_flags = O_CREAT|O_WRONLY|O_APPEND|O_TRUNC; |
| |
| return file_flags; |
| } |