| /*****************************************************************************\ |
| * io.c - definitions for connection I/O in connection manager |
| ***************************************************************************** |
| * Copyright (C) SchedMD LLC. |
| * |
| * This file is part of Slurm, a resource management program. |
| * For details, see <https://slurm.schedmd.com/>. |
| * Please also read the included file: DISCLAIMER. |
| * |
| * Slurm is free software; you can redistribute it and/or modify it under |
| * the terms of the GNU General Public License as published by the Free |
| * Software Foundation; either version 2 of the License, or (at your option) |
| * any later version. |
| * |
| * In addition, as a special exception, the copyright holders give permission |
| * to link the code of portions of this program with the OpenSSL library under |
| * certain conditions as described in each individual source file, and |
| * distribute linked combinations including the two. You must obey the GNU |
| * General Public License in all respects for all of the code used other than |
| * OpenSSL. If you modify file(s) with this exception, you may extend this |
| * exception to your version of the file(s), but you are not obligated to do |
| * so. If you do not wish to do so, delete this exception statement from your |
| * version. If you delete this exception statement from all source files in |
| * the program, then also delete it here. |
| * |
| * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY |
| * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
| * details. |
| * |
| * You should have received a copy of the GNU General Public License along |
| * with Slurm; if not, write to the Free Software Foundation, Inc., |
| * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| \*****************************************************************************/ |
| |
| #define _GNU_SOURCE |
| #include <limits.h> |
| #include <sys/uio.h> |
| |
| #include "slurm/slurm_errno.h" |
| |
| #include "src/common/fd.h" |
| #include "src/common/macros.h" |
| #include "src/common/pack.h" |
| #include "src/common/read_config.h" |
| #include "src/common/slurm_time.h" |
| #include "src/common/xmalloc.h" |
| |
| #include "src/conmgr/conmgr.h" |
| #include "src/conmgr/mgr.h" |
| |
| #define DEFAULT_READ_BYTES 512 |
| |
| /* |
| * Default number of write()s to queue up using the stack instead of xmalloc(). |
| * Avoid the slow down from calling xmalloc() on a majority of the writev()s. |
| */ |
| #define IOV_STACK_COUNT 16 |
| #define HANDLE_WRITEV_ARGS_MAGIC 0x1a4afb40 |
| typedef struct { |
| int magic; /* HANDLE_WRITEV_ARGS_MAGIC */ |
| int index; |
| const int iov_count; |
| conmgr_fd_t *con; |
| struct iovec *iov; |
| ssize_t wrote; |
| } handle_writev_args_t; |
| |
| extern void resize_input_buffer(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| int rc; |
| uint64_t bytes = (uint64_t) arg; |
| conmgr_fd_t *con = conmgr_args.con; |
| |
| if (conmgr_args.status == CONMGR_WORK_STATUS_CANCELLED) |
| return; |
| |
| xassert(bytes > 0); |
| xassert(bytes < MAX_MSG_SIZE); |
| |
| if (!(rc = try_grow_buf_remaining(con->in, bytes))) |
| return; |
| |
| log_flag(NET, "%s: [%s] unable to increase buffer %"PRIu64" bytes for RPC message: %s", |
| __func__, con->name, bytes, slurm_strerror(rc)); |
| |
| /* conmgr will be unable to read entire RPC -> close connection now */ |
| close_con(false, con); |
| } |
| |
| static int _get_fd_readable(conmgr_fd_t *con) |
| { |
| int readable = 0; |
| |
| if (fd_get_readable_bytes(con->input_fd, &readable, con->name) || |
| !readable) { |
| if (con->mss != NO_VAL) |
| readable = con->mss; |
| else |
| readable = DEFAULT_READ_BYTES; |
| } |
| |
| /* |
| * Limit read byte count to avoid creating huge buffers from a huge MSS |
| * on a loopback device or a buggy device driver. |
| */ |
| readable = MIN(readable, MAX_MSG_SIZE); |
| |
| /* |
| * Even if there are zero bytes to read, we want to make sure that we |
| * already try to do the read to avoid a shutdown(SHUT_RDWR) file |
| * descriptor never getting the final read()=0. |
| */ |
| readable = MAX(readable, DEFAULT_READ_BYTES); |
| |
| return readable; |
| } |
| |
| extern void read_input(conmgr_fd_t *con, buf_t *buf, const char *what) |
| { |
| ssize_t read_c; |
| int rc, readable; |
| |
| con_unset_flag(con, FLAG_CAN_READ); |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| |
| if (con->input_fd < 0) { |
| log_flag(NET, "%s: [%s] called on closed connection", |
| __func__, con->name); |
| return; |
| } |
| |
| readable = _get_fd_readable(con); |
| |
| /* Grow buffer as needed to handle the incoming data */ |
| if ((rc = try_grow_buf_remaining(buf, readable))) { |
| error("%s: [%s] unable to allocate larger %s: %s", |
| __func__, con->name, what, slurm_strerror(rc)); |
| close_con(false, con); |
| return; |
| } |
| |
| /* check for errors with a NULL read */ |
| if ((read_c = read(con->input_fd, |
| (get_buf_data(buf) + get_buf_offset(buf)), |
| readable)) < 0) { |
| if (errno == EAGAIN || errno == EWOULDBLOCK) { |
| log_flag(NET, "%s: [%s] socket would block on read", |
| __func__, con->name); |
| return; |
| } |
| |
| log_flag(NET, "%s: [%s] error while reading: %m", |
| __func__, con->name); |
| |
| close_con(false, con); |
| return; |
| } |
| |
| /* Always update read timestamp on read() success */ |
| if (con_flag(con, FLAG_WATCH_READ_TIMEOUT)) |
| con->last_read = timespec_now(); |
| |
| if (read_c == 0) { |
| log_flag(NET, "%s: [%s] read EOF with %u bytes to process already in %s", |
| __func__, con->name, get_buf_offset(buf), what); |
| |
| slurm_mutex_lock(&mgr.mutex); |
| /* lock to tell mgr that we are done */ |
| con_set_flag(con, FLAG_READ_EOF); |
| slurm_mutex_unlock(&mgr.mutex); |
| } else { |
| log_flag(NET, "%s: [%s] read %zd bytes with %u bytes to process already in %s", |
| __func__, con->name, read_c, get_buf_offset(buf), |
| what); |
| log_flag_hex(NET_RAW, |
| (get_buf_data(buf) + get_buf_offset(buf)), |
| read_c, "%s: [%s] read", __func__, con->name); |
| |
| set_buf_offset(buf, (get_buf_offset(buf) + read_c)); |
| } |
| } |
| |
| extern void handle_read(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| conmgr_fd_t *con = conmgr_args.con; |
| |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| xassert(!con_flag(con, FLAG_TLS_CLIENT) || !con->tls); |
| xassert(!con_flag(con, FLAG_TLS_SERVER) || !con->tls); |
| xassert(!con->tls); |
| |
| read_input(con, con->in, "input buffer"); |
| } |
| |
| static int _foreach_add_writev_iov(void *x, void *arg) |
| { |
| buf_t *out = x; |
| handle_writev_args_t *args = arg; |
| struct iovec *iov = &args->iov[args->index]; |
| |
| xassert(out->magic == BUF_MAGIC); |
| xassert(args->magic == HANDLE_WRITEV_ARGS_MAGIC); |
| |
| if (args->index >= args->iov_count) |
| return -1; |
| |
| iov->iov_base = ((void *) get_buf_data(out)) + get_buf_offset(out); |
| iov->iov_len = remaining_buf(out); |
| |
| log_flag(CONMGR, "%s: [%s] queued writev[%d] %u/%u bytes to outgoing fd %u", |
| __func__, args->con->name, args->index, remaining_buf(out), |
| size_buf(out), args->con->output_fd); |
| |
| args->index++; |
| return 0; |
| } |
| |
| static int _foreach_writev_flush_bytes(void *x, void *arg) |
| { |
| buf_t *out = x; |
| handle_writev_args_t *args = arg; |
| |
| xassert(out->magic == BUF_MAGIC); |
| xassert(args->magic == HANDLE_WRITEV_ARGS_MAGIC); |
| xassert(args->wrote >= 0); |
| |
| if (!args->wrote) |
| return 0; |
| |
| if (args->wrote >= remaining_buf(out)) { |
| log_flag(NET, "%s: [%s] completed write[%d] of %u/%u bytes to outgoing fd %u", |
| __func__, args->con->name, args->index, |
| remaining_buf(out), size_buf(out), |
| args->con->output_fd); |
| log_flag_hex_range(NET_RAW, get_buf_data(out), size_buf(out), |
| get_buf_offset(out), size_buf(out), |
| "%s: [%s] completed write[%d] of %u/%u bytes", |
| __func__, args->con->name, args->index, |
| remaining_buf(out), size_buf(out)); |
| |
| args->wrote -= remaining_buf(out); |
| args->index++; |
| return 1; |
| } else { |
| log_flag(CONMGR, "%s: [%s] partial write[%d] of %zd/%u bytes to outgoing fd %u", |
| __func__, args->con->name, args->index, |
| args->wrote, size_buf(out), args->con->output_fd); |
| log_flag_hex_range(NET_RAW, get_buf_data(out), size_buf(out), |
| get_buf_offset(out), args->wrote, |
| "%s: [%s] partial write[%d] of %zd/%u bytes", |
| __func__, args->con->name, args->index, |
| args->wrote, remaining_buf(out)); |
| |
| set_buf_offset(out, get_buf_offset(out) + args->wrote); |
| args->wrote = 0; |
| args->index++; |
| return 0; |
| } |
| } |
| |
| extern void write_output(conmgr_fd_t *con, const int out_count, list_t *out) |
| { |
| const int iov_count = MIN(IOV_MAX, out_count); |
| struct iovec iov_stack[IOV_STACK_COUNT]; |
| handle_writev_args_t args = { |
| .magic = HANDLE_WRITEV_ARGS_MAGIC, |
| .iov_count = iov_count, |
| .con = con, |
| .iov = iov_stack, |
| }; |
| |
| /* Try to use stack for small write counts when possible */ |
| if (iov_count > ARRAY_SIZE(iov_stack)) |
| args.iov = xcalloc(iov_count, sizeof(*args.iov)); |
| |
| (void) list_for_each_ro(out, _foreach_add_writev_iov, &args); |
| xassert(args.index == iov_count); |
| |
| args.wrote = writev(con->output_fd, args.iov, iov_count); |
| |
| if (args.wrote == -1) { |
| if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { |
| log_flag(NET, "%s: [%s] retry write: %m", |
| __func__, con->name); |
| } else { |
| error("%s: [%s] writev(%d) failed: %m", |
| __func__, con->name, con->output_fd); |
| /* drop outbound data on the floor */ |
| list_flush(out); |
| close_con(false, con); |
| close_con_output(false, con); |
| } |
| } else if (args.wrote == 0) { |
| log_flag(NET, "%s: [%s] wrote 0 bytes", __func__, con->name); |
| } else { |
| log_flag(NET, "%s: [%s] wrote %zd bytes", |
| __func__, con->name, args.wrote); |
| |
| args.index = 0; |
| (void) list_delete_all(out, _foreach_writev_flush_bytes, |
| &args); |
| xassert(!args.wrote); |
| |
| if (con_flag(con, FLAG_WATCH_WRITE_TIMEOUT)) |
| con->last_write = timespec_now(); |
| } |
| |
| if (args.iov != iov_stack) |
| xfree(args.iov); |
| } |
| |
| extern void handle_write(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| conmgr_fd_t *con = conmgr_args.con; |
| int out_count; |
| |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| xassert(!con_flag(con, FLAG_TLS_CLIENT) || !con->tls); |
| xassert(!con_flag(con, FLAG_TLS_SERVER)); |
| xassert(!con->tls); |
| |
| if (!(out_count = list_count(con->out))) |
| log_flag(CONMGR, "%s: [%s] skipping attempt with zero writes", |
| __func__, con->name); |
| else |
| write_output(con, out_count, con->out); |
| } |
| |
| extern void wrap_on_data(conmgr_callback_args_t conmgr_args, void *arg) |
| { |
| conmgr_fd_t *con = conmgr_args.con; |
| int avail = get_buf_offset(con->in); |
| int size = size_buf(con->in); |
| int rc; |
| int (*callback)(conmgr_fd_t *con, void *arg) = NULL; |
| const char *callback_string = NULL; |
| |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| |
| /* override buffer offset to allow reading */ |
| set_buf_offset(con->in, 0); |
| /* override buffer size to only read up to previous offset */ |
| con->in->size = avail; |
| |
| if (con->type == CON_TYPE_RAW) { |
| callback = con->events->on_data; |
| callback_string = XSTRINGIFY(con->events->on_data); |
| } else if (con->type == CON_TYPE_RPC) { |
| callback = on_rpc_connection_data; |
| callback_string = XSTRINGIFY(on_rpc_connection_data); |
| } else { |
| fatal("%s: invalid type", __func__); |
| } |
| |
| log_flag(CONMGR, "%s: [%s] BEGIN func=%s(arg=0x%"PRIxPTR")@0x%"PRIxPTR, |
| __func__, con->name, callback_string, (uintptr_t) con->arg, |
| (uintptr_t) callback); |
| |
| rc = callback(con, con->arg); |
| |
| log_flag(CONMGR, "%s: [%s] END func=%s(arg=0x%"PRIxPTR")@0x%"PRIxPTR"=[%d]%s", |
| __func__, con->name, callback_string, (uintptr_t) con->arg, |
| (uintptr_t) callback, rc, slurm_strerror(rc)); |
| |
| if (rc) { |
| error("%s: [%s] on_data returned rc: %s", |
| __func__, con->name, slurm_strerror(rc)); |
| |
| slurm_mutex_lock(&mgr.mutex); |
| if (mgr.exit_on_error) |
| mgr.shutdown_requested = true; |
| |
| if (!mgr.error) |
| mgr.error = rc; |
| slurm_mutex_unlock(&mgr.mutex); |
| |
| /* |
| * processing data failed so drop any |
| * pending data on the floor |
| */ |
| log_flag(CONMGR, "%s: [%s] on_data callback failed. Purging the remaining %d bytes of pending input.", |
| __func__, con->name, get_buf_offset(con->in)); |
| set_buf_offset(con->in, 0); |
| |
| close_con(false, con); |
| return; |
| } |
| |
| if (get_buf_offset(con->in) < size_buf(con->in)) { |
| if (get_buf_offset(con->in) > 0) { |
| log_flag(CONMGR, "%s: [%s] partial read %u/%u bytes.", |
| __func__, con->name, get_buf_offset(con->in), |
| size_buf(con->in)); |
| |
| /* |
| * not all data read, need to shift it to start of |
| * buffer and fix offset |
| */ |
| memmove(get_buf_data(con->in), |
| (get_buf_data(con->in) + |
| get_buf_offset(con->in)), |
| remaining_buf(con->in)); |
| |
| /* reset start of offset to end of previous data */ |
| set_buf_offset(con->in, remaining_buf(con->in)); |
| } else { |
| /* need more data for parser to read */ |
| log_flag(CONMGR, "%s: [%s] parser refused to read %u bytes. Waiting for more data.", |
| __func__, con->name, size_buf(con->in)); |
| |
| con_set_flag(con, FLAG_ON_DATA_TRIED); |
| |
| /* revert offset change */ |
| set_buf_offset(con->in, avail); |
| } |
| } else |
| /* buffer completely read: reset it */ |
| set_buf_offset(con->in, 0); |
| |
| /* restore original size */ |
| con->in->size = size; |
| } |
| |
| extern int conmgr_queue_write_data(conmgr_fd_t *con, const void *buffer, |
| const size_t bytes) |
| { |
| buf_t *buf; |
| |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| |
| /* Ignore empty write requests */ |
| if (!bytes) |
| return SLURM_SUCCESS; |
| |
| buf = init_buf(bytes); |
| |
| /* TODO: would be nice to avoid this copy */ |
| memmove(get_buf_data(buf), buffer, bytes); |
| |
| log_flag(NET, "%s: [%s] write of %zu bytes queued", |
| __func__, con->name, bytes); |
| |
| log_flag_hex(NET_RAW, get_buf_data(buf), get_buf_offset(buf), |
| "%s: queuing up write", __func__); |
| |
| list_append(con->out, buf); |
| |
| if (con_flag(con, FLAG_WATCH_WRITE_TIMEOUT)) |
| con->last_write = timespec_now(); |
| |
| slurm_mutex_lock(&mgr.mutex); |
| EVENT_SIGNAL(&mgr.watch_sleep); |
| slurm_mutex_unlock(&mgr.mutex); |
| return SLURM_SUCCESS; |
| } |
| |
| static int _get_input_buffer(const conmgr_fd_t *con, const void **data_ptr, |
| size_t *bytes_ptr) |
| { |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| xassert(con_flag(con, FLAG_WORK_ACTIVE)); |
| |
| if (!con->in) |
| return ENOENT; |
| |
| if (data_ptr) |
| *data_ptr = get_buf_data(con->in) + get_buf_offset(con->in); |
| *bytes_ptr = size_buf(con->in); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| extern void conmgr_fd_get_in_buffer(const conmgr_fd_t *con, |
| const void **data_ptr, size_t *bytes_ptr) |
| { |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| |
| (void) _get_input_buffer(con, data_ptr, bytes_ptr); |
| } |
| |
| extern int conmgr_con_get_input_buffer(conmgr_fd_ref_t *ref, |
| const void **data_ptr, size_t *bytes_ptr) |
| { |
| xassert(ref); |
| xassert(data_ptr || bytes_ptr); |
| xassert(ref->magic == MAGIC_CON_MGR_FD_REF); |
| |
| return _get_input_buffer(ref->con, data_ptr, bytes_ptr); |
| } |
| |
| static int _con_get_shadow_in_buffer(const conmgr_fd_t *con, buf_t **buf_ptr) |
| { |
| buf_t *buffer = NULL; |
| void *data = NULL; |
| size_t bytes = 0; |
| |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| xassert(con->type == CON_TYPE_RAW); |
| xassert(con_flag(con, FLAG_WORK_ACTIVE)); |
| |
| if (!con->in) |
| return EEXIST; |
| |
| data = (get_buf_data(con->in) + con->in->processed); |
| bytes = (size_buf(con->in) - con->in->processed); |
| |
| if (!(buffer = create_shadow_buf(data, bytes))) |
| return ENOMEM; |
| |
| xassert(!*buf_ptr); |
| *buf_ptr = buffer; |
| return SLURM_SUCCESS; |
| } |
| |
| extern buf_t *conmgr_fd_shadow_in_buffer(const conmgr_fd_t *con) |
| { |
| buf_t *buffer = NULL; |
| |
| (void) _con_get_shadow_in_buffer(con, &buffer); |
| |
| return buffer; |
| } |
| |
| extern int conmgr_con_shadow_in_buffer(conmgr_fd_ref_t *ref, buf_t **buf_ptr) |
| { |
| xassert(ref); |
| xassert(ref->magic == MAGIC_CON_MGR_FD_REF); |
| |
| return _con_get_shadow_in_buffer(ref->con, buf_ptr); |
| } |
| |
| static int _mark_consumed_in_buffer(const conmgr_fd_t *con, size_t bytes) |
| { |
| ssize_t offset = -1; |
| |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| xassert(con_flag(con, FLAG_WORK_ACTIVE)); |
| |
| if (!con->in) |
| return ENOENT; |
| |
| offset = get_buf_offset(con->in) + bytes; |
| xassert(offset <= size_buf(con->in)); |
| |
| set_buf_offset(con->in, offset); |
| return SLURM_SUCCESS; |
| } |
| |
| extern void conmgr_fd_mark_consumed_in_buffer(const conmgr_fd_t *con, |
| size_t bytes) |
| { |
| (void) _mark_consumed_in_buffer(con, bytes); |
| } |
| |
| extern int conmgr_con_mark_consumed_input_buffer(conmgr_fd_ref_t *ref, |
| const size_t bytes) |
| { |
| xassert(ref); |
| xassert(ref->magic == MAGIC_CON_MGR_FD_REF); |
| |
| return _mark_consumed_in_buffer(ref->con, bytes); |
| } |
| |
| extern int conmgr_fd_xfer_in_buffer(const conmgr_fd_t *con, |
| buf_t **buffer_ptr) |
| { |
| const void *data = (get_buf_data(con->in) + get_buf_offset(con->in)); |
| const size_t bytes = (size_buf(con->in) - get_buf_offset(con->in)); |
| buf_t *buf = NULL; |
| int rc; |
| |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| xassert(con->type == CON_TYPE_RAW); |
| xassert(con_flag(con, FLAG_WORK_ACTIVE)); |
| |
| xassert(buffer_ptr); |
| if (!buffer_ptr) |
| return EINVAL; |
| |
| /* |
| * Create buffer if needed and size it size of the data to copy or the |
| * minimal buffer size to avoid multiple recalloc()s in the future. |
| */ |
| if (!*buffer_ptr && |
| !(*buffer_ptr = init_buf(MAX(bytes, BUFFER_START_SIZE)))) |
| return ENOMEM; |
| |
| buf = *buffer_ptr; |
| |
| /* grow buffer to size to hold incoming data (if needed) */ |
| if ((rc = try_grow_buf_remaining(buf, bytes))) |
| return rc; |
| |
| /* Append data to existing buffer */ |
| memcpy((get_buf_data(buf) + get_buf_offset(buf)), data, bytes); |
| set_buf_offset(buf, (get_buf_offset(buf) + bytes)); |
| |
| /* mark connection input buffer as fully consumed */ |
| set_buf_offset(con->in, size_buf(con->in)); |
| return SLURM_SUCCESS; |
| } |
| |
| extern int conmgr_fd_xfer_out_buffer(conmgr_fd_t *con, buf_t *output) |
| { |
| int rc; |
| |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| xassert(con->type == CON_TYPE_RAW); |
| xassert(!output || (output->magic == BUF_MAGIC)); |
| |
| if (!output || !size_buf(output) || !get_buf_offset(output)) |
| return SLURM_SUCCESS; |
| |
| xassert(size_buf(output) <= xsize(get_buf_data(output))); |
| xassert(get_buf_offset(output) <= size_buf(output)); |
| |
| rc = conmgr_queue_write_data(con, get_buf_data(output), |
| get_buf_offset(output)); |
| |
| if (!rc) |
| set_buf_offset(output, 0); |
| |
| return rc; |
| } |
| |
| extern int conmgr_fd_get_input_fd(conmgr_fd_t *con) |
| { |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| xassert(con_flag(con, FLAG_WORK_ACTIVE)); |
| return con->input_fd; |
| } |
| |
| extern int conmgr_fd_get_output_fd(conmgr_fd_t *con) |
| { |
| xassert(con->magic == MAGIC_CON_MGR_FD); |
| xassert(con_flag(con, FLAG_WORK_ACTIVE)); |
| return con->output_fd; |
| } |