| /*****************************************************************************\ |
| ** pmix_io.c - PMIx non-blocking IO routines |
| ***************************************************************************** |
| * Copyright (C) 2014-2015 Artem Polyakov. All rights reserved. |
| * Copyright (C) 2015-2017 Mellanox Technologies. All rights reserved. |
| * Written by Artem Polyakov <artpol84@gmail.com, artemp@mellanox.com>. |
| * |
| * This file is part of Slurm, a resource management program. |
| * For details, see <https://slurm.schedmd.com/>. |
| * Please also read the included file: DISCLAIMER. |
| * |
| * Slurm is free software; you can redistribute it and/or modify it under |
| * the terms of the GNU General Public License as published by the Free |
| * Software Foundation; either version 2 of the License, or (at your option) |
| * any later version. |
| * |
| * In addition, as a special exception, the copyright holders give permission |
| * to link the code of portions of this program with the OpenSSL library under |
| * certain conditions as described in each individual source file, and |
| * distribute linked combinations including the two. You must obey the GNU |
| * General Public License in all respects for all of the code used other than |
| * OpenSSL. If you modify file(s) with this exception, you may extend this |
| * exception to your version of the file(s), but you are not obligated to do |
| * so. If you do not wish to do so, delete this exception statement from your |
| * version. If you delete this exception statement from all source files in |
| * the program, then also delete it here. |
| * |
| * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY |
| * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
| * details. |
| * |
| * You should have received a copy of the GNU General Public License along |
| * with Slurm; if not, write to the Free Software Foundation, Inc., |
| * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| \*****************************************************************************/ |
| #include <stdio.h> |
| #include <string.h> |
| #include <stdlib.h> |
| #include <sys/types.h> |
| #include <sys/socket.h> |
| #include <sys/un.h> |
| #include <errno.h> |
| |
| #include "pmixp_common.h" |
| #include "pmixp_io.h" |
| #include "pmixp_debug.h" |
| #include "pmixp_utils.h" |
| |
| static inline void _rcvd_clear_counters(pmixp_io_engine_t *eng); |
| |
| static void |
| _verify_transceiver(pmixp_p2p_data_t header) |
| { |
| /* sanity checks */ |
| if (!header.payload_size_cb){ |
| PMIXP_ERROR("No payload size callback provided"); |
| goto check_fail; |
| } |
| |
| if (header.recv_on) { |
| if (0 == header.rhdr_host_size){ |
| PMIXP_ERROR("Bad host header size"); |
| goto check_fail; |
| } |
| if (0 == header.rhdr_net_size){ |
| PMIXP_ERROR("Bad net header size"); |
| goto check_fail; |
| } |
| if (NULL == header.hdr_unpack_cb){ |
| PMIXP_ERROR("No header unpack callback provided"); |
| goto check_fail; |
| } |
| } |
| |
| /* sanity checks */ |
| if (header.send_on) { |
| if (!header.buf_ptr) { |
| PMIXP_ERROR("No message pointer callback provided"); |
| goto check_fail; |
| } |
| if (!header.buf_size) { |
| PMIXP_ERROR("No message size callback provided"); |
| goto check_fail; |
| } |
| if (!header.send_complete) { |
| PMIXP_ERROR("No message free callback provided"); |
| goto check_fail; |
| } |
| } |
| return; |
| check_fail: |
| abort(); |
| } |
| |
| void pmixp_io_init(pmixp_io_engine_t *eng, |
| pmixp_p2p_data_t header) |
| { |
| memset(eng, 0, sizeof(*eng)); |
| /* Initialize general options */ |
| #ifndef NDEBUG |
| eng->magic = PMIXP_MSGSTATE_MAGIC; |
| #endif |
| eng->error = 0; |
| eng->h = header; |
| eng->io_state = PMIXP_IO_INIT; |
| |
| _verify_transceiver(header); |
| |
| /* Init receiver */ |
| if (eng->h.recv_on) { |
| _rcvd_clear_counters(eng); |
| /* we are going to receive data */ |
| eng->rcvd_hdr_net = xmalloc(eng->h.rhdr_net_size); |
| eng->rcvd_hdr_host = xmalloc(eng->h.rhdr_host_size); |
| } else { |
| /* receiver won't be used */ |
| eng->rcvd_hdr_host = NULL; |
| eng->rcvd_hdr_net = NULL; |
| } |
| |
| /* Init transmitter */ |
| slurm_mutex_init(&eng->send_lock); |
| eng->send_current = NULL; |
| eng->send_offs = eng->send_msg_size = 0; |
| eng->send_msg_ptr = NULL; |
| eng->send_queue = list_create(NULL); |
| eng->complete_queue = list_create(NULL); |
| } |
| |
| static inline void |
| _pmixp_io_drop_messages(pmixp_io_engine_t *eng) |
| { |
| xassert(NULL != eng); |
| xassert(PMIXP_MSGSTATE_MAGIC == eng->magic); |
| |
| if (eng->h.recv_on) { |
| if (NULL != eng->rcvd_payload) { |
| xfree(eng->rcvd_payload); |
| } |
| _rcvd_clear_counters(eng); |
| } |
| |
| if (eng->h.send_on) { |
| void *msg = NULL; |
| |
| /* complete all messages */ |
| pmixp_io_send_cleanup(eng, PMIXP_P2P_REGULAR); |
| |
| /* drop all outstanding messages */ |
| while ((msg = list_dequeue(eng->send_queue))) { |
| eng->h.send_complete(msg, PMIXP_P2P_REGULAR, |
| SLURM_SUCCESS); |
| } |
| |
| if (NULL != eng->send_current) { |
| eng->h.send_complete(eng->send_current, |
| PMIXP_P2P_REGULAR, SLURM_SUCCESS); |
| eng->send_current = NULL; |
| } |
| eng->send_msg_ptr = NULL; |
| eng->send_offs = eng->send_msg_size = 0; |
| } |
| } |
| |
| int pmixp_io_detach(pmixp_io_engine_t *eng) |
| { |
| int ret = 0; |
| /* Initialize general options */ |
| xassert(NULL != eng); |
| xassert(PMIXP_MSGSTATE_MAGIC == eng->magic); |
| xassert(PMIXP_IO_OPERATING == eng->io_state || |
| PMIXP_IO_CONN_CLOSED == eng->io_state); |
| _pmixp_io_drop_messages(eng); |
| ret = eng->sd; |
| eng->sd = -1; |
| eng->io_state = PMIXP_IO_INIT; |
| return ret; |
| } |
| |
| void pmixp_io_finalize(pmixp_io_engine_t *eng, int err) |
| { |
| xassert(NULL != eng); |
| xassert(PMIXP_MSGSTATE_MAGIC == eng->magic); |
| |
| switch (eng->io_state) |
| { |
| case PMIXP_IO_FINALIZED: |
| /* avoid double finalization */ |
| PMIXP_ERROR("Attempt to finalize already finalized I/O engine"); |
| return; |
| case PMIXP_IO_OPERATING: |
| close(eng->sd); |
| eng->sd = -1; |
| /* fall through to init cleanup */ |
| case PMIXP_IO_INIT: |
| /* release all messages in progress */ |
| _pmixp_io_drop_messages(eng); |
| |
| /* Release all receiver resources*/ |
| if (eng->h.recv_on) { |
| xfree(eng->rcvd_hdr_net); |
| xfree(eng->rcvd_hdr_host); |
| eng->rcvd_hdr_net = NULL; |
| eng->rcvd_hdr_host = NULL; |
| } |
| |
| /* Release all sender resources*/ |
| if (eng->h.send_on) { |
| FREE_NULL_LIST(eng->send_queue); |
| FREE_NULL_LIST(eng->complete_queue); |
| eng->send_msg_size = eng->send_offs = 0; |
| } |
| break; |
| case PMIXP_IO_NONE: |
| PMIXP_ERROR("Attempt to finalize non-initialized I/O engine"); |
| break; |
| default: |
| PMIXP_ERROR("I/O engine was damaged, unknown state: %d", |
| (int)eng->io_state); |
| break; |
| } |
| |
| eng->io_state = PMIXP_IO_NONE; |
| if (err < 0) { |
| eng->error = -err; |
| } else { |
| eng->error = 0; |
| } |
| } |
| |
| /* Receiver */ |
| |
| static inline void _rcvd_clear_counters(pmixp_io_engine_t *eng) |
| { |
| xassert(NULL != eng); |
| xassert(PMIXP_MSGSTATE_MAGIC == eng->magic); |
| |
| eng->rcvd_pad_recvd = 0; |
| eng->rcvd_hdr_offs = 0; |
| eng->rcvd_pay_offs = eng->rcvd_pay_size = 0; |
| eng->rcvd_payload = NULL; |
| |
| } |
| |
| static inline int _rcvd_swithch_to_body(pmixp_io_engine_t *eng) |
| { |
| int rc; |
| xassert(NULL != eng); |
| xassert(PMIXP_MSGSTATE_MAGIC == eng->magic); |
| xassert(NULL != eng->rcvd_hdr_net); |
| xassert(pmixp_io_operating(eng)); |
| xassert(eng->h.recv_on); |
| xassert(eng->h.rhdr_net_size == eng->rcvd_hdr_offs); |
| |
| eng->rcvd_pay_offs = eng->rcvd_pay_size = 0; |
| eng->rcvd_payload = NULL; |
| rc = eng->h.hdr_unpack_cb(eng->rcvd_hdr_net, eng->rcvd_hdr_host); |
| if (0 != rc) { |
| PMIXP_ERROR_NO(rc, "Cannot unpack message header"); |
| return rc; |
| } |
| eng->rcvd_pay_size = eng->h.payload_size_cb(eng->rcvd_hdr_host); |
| if (0 != eng->rcvd_pay_size) { |
| eng->rcvd_payload = xmalloc(eng->rcvd_pay_size); |
| } |
| return 0; |
| } |
| |
| static inline bool _rcvd_have_padding(pmixp_io_engine_t *eng) |
| { |
| xassert(NULL != eng); |
| xassert(PMIXP_MSGSTATE_MAGIC == eng->magic); |
| xassert(NULL != eng->rcvd_hdr_net); |
| xassert(pmixp_io_operating(eng)); |
| xassert(eng->h.recv_on); |
| return eng->h.recv_padding && |
| (eng->rcvd_pad_recvd < eng->h.recv_padding); |
| } |
| |
| static inline bool _rcvd_need_header(pmixp_io_engine_t *eng) |
| { |
| xassert(NULL != eng); |
| xassert(PMIXP_MSGSTATE_MAGIC == eng->magic); |
| xassert(NULL != eng->rcvd_hdr_net); |
| xassert(pmixp_io_operating(eng)); |
| xassert(eng->h.recv_on); |
| |
| return eng->rcvd_hdr_offs < eng->h.rhdr_net_size; |
| } |
| |
| void pmixp_io_rcvd_progress(pmixp_io_engine_t *eng) |
| { |
| size_t size, remain; |
| void *offs; |
| int shutdown; |
| int fd; |
| |
| xassert(NULL != eng); |
| xassert(PMIXP_MSGSTATE_MAGIC == eng->magic); |
| xassert(NULL != eng->rcvd_hdr_net); |
| xassert(pmixp_io_operating(eng)); |
| xassert(eng->h.recv_on); |
| |
| fd = eng->sd; |
| |
| if (!pmixp_io_operating(eng)) { |
| return; |
| } |
| |
| if (pmixp_io_rcvd_ready(eng)) { |
| /* nothing to do, |
| * first the current message has to be extracted |
| */ |
| return; |
| } |
| /* Drop padding first so it won't corrupt the message */ |
| if (_rcvd_have_padding(eng)) { |
| char buf[eng->h.recv_padding]; |
| size = eng->h.recv_padding; |
| remain = size - eng->rcvd_pad_recvd; |
| eng->rcvd_pad_recvd += |
| pmixp_read_buf(fd, buf, remain, |
| &shutdown, false); |
| if (shutdown) { |
| eng->io_state = PMIXP_IO_CONN_CLOSED; |
| return; |
| } |
| if (eng->rcvd_pad_recvd < size) { |
| /* normal return. receive another portion of |
| * header later |
| */ |
| return; |
| } |
| } |
| |
| if (_rcvd_need_header(eng)) { |
| /* need to finish with the header */ |
| size = eng->h.rhdr_net_size; |
| remain = size - eng->rcvd_hdr_offs; |
| offs = eng->rcvd_hdr_net + eng->rcvd_hdr_offs; |
| eng->rcvd_hdr_offs += |
| pmixp_read_buf(fd, offs, remain, |
| &shutdown, false); |
| if (shutdown) { |
| eng->io_state = PMIXP_IO_CONN_CLOSED; |
| return; |
| } |
| if (_rcvd_need_header(eng)) { |
| /* normal return. receive another portion |
| * of header later */ |
| return; |
| } |
| /* if we are here then header is received and we can |
| * adjust buffer */ |
| if ((shutdown = _rcvd_swithch_to_body(eng))) { |
| /* drop # of received bytes to identify that |
| * message is not ready |
| */ |
| eng->rcvd_hdr_offs = 0; |
| eng->io_state = PMIXP_IO_CONN_CLOSED; |
| return; |
| } |
| /* go ahared with body receive */ |
| } |
| /* we are receiving the body */ |
| xassert(eng->rcvd_hdr_offs == eng->h.rhdr_net_size); |
| if (0 == eng->rcvd_pay_size) { |
| /* zero-byte message. Exit so we will hit |
| * pmixp_io_rcvd_ready */ |
| return; |
| } |
| size = eng->rcvd_pay_size; |
| remain = size - eng->rcvd_pay_offs; |
| eng->rcvd_pay_offs += |
| pmixp_read_buf(fd, |
| eng->rcvd_payload + eng->rcvd_pay_offs, |
| remain, &shutdown, false); |
| if (shutdown) { |
| eng->io_state = PMIXP_IO_CONN_CLOSED; |
| return; |
| } |
| if (eng->rcvd_pay_offs != size) { |
| /* normal return. receive another portion later */ |
| PMIXP_DEBUG("Message is ready for processing!"); |
| return; |
| } |
| return; |
| } |
| |
| void *pmixp_io_rcvd_extract(pmixp_io_engine_t *eng, void *header) |
| { |
| xassert(NULL != eng); |
| xassert(PMIXP_MSGSTATE_MAGIC == eng->magic); |
| xassert(NULL != eng->rcvd_hdr_net); |
| xassert(pmixp_io_operating(eng)); |
| xassert(eng->h.recv_on); |
| xassert(pmixp_io_rcvd_ready(eng)); |
| |
| if (!pmixp_io_operating(eng)) { |
| return NULL; |
| } |
| void *ptr = eng->rcvd_payload; |
| memcpy(header, eng->rcvd_hdr_host, (size_t)eng->h.rhdr_host_size); |
| /* Drop message state to receive new one */ |
| _rcvd_clear_counters(eng); |
| return ptr; |
| } |
| |
| /* Transmitter */ |
| |
| static inline int _send_set_current(pmixp_io_engine_t *eng, void *msg) |
| { |
| xassert(NULL != eng); |
| xassert(PMIXP_MSGSTATE_MAGIC == eng->magic); |
| xassert(pmixp_io_enqueue_ok(eng)); |
| xassert(eng->h.send_on); |
| xassert(NULL == eng->send_current); |
| |
| /* Set message basis */ |
| eng->send_current = msg; |
| |
| /* Setup payload for sending */ |
| eng->send_msg_ptr = eng->h.buf_ptr(msg); |
| eng->send_msg_size = eng->h.buf_size(msg); |
| |
| /* Setup send offset */ |
| eng->send_offs = 0; |
| return 0; |
| } |
| |
| static inline void _send_free_current(pmixp_io_engine_t *eng) |
| { |
| xassert(NULL != eng); |
| xassert(PMIXP_MSGSTATE_MAGIC == eng->magic); |
| xassert(NULL != eng->rcvd_hdr_net); |
| xassert(pmixp_io_operating(eng)); |
| xassert(eng->h.send_on); |
| xassert(NULL != eng->send_current); |
| |
| eng->send_msg_ptr = NULL; |
| eng->send_msg_size = eng->send_offs = 0; |
| /* Do not call the callback now. Defer for the |
| * progress thread |
| */ |
| list_enqueue(eng->complete_queue, eng->send_current); |
| eng->send_current = NULL; |
| } |
| |
| static inline int _send_msg_ok(pmixp_io_engine_t *eng) |
| { |
| xassert(NULL != eng); |
| xassert(PMIXP_MSGSTATE_MAGIC == eng->magic); |
| xassert(pmixp_io_enqueue_ok(eng)); |
| xassert(eng->h.send_on); |
| |
| return (eng->send_current != NULL) |
| && (eng->send_offs == eng->send_msg_size); |
| } |
| |
| static bool _send_pending(pmixp_io_engine_t *eng) |
| { |
| int rc; |
| xassert(NULL != eng); |
| xassert(PMIXP_MSGSTATE_MAGIC == eng->magic); |
| xassert(pmixp_io_enqueue_ok(eng)); |
| xassert(eng->h.send_on); |
| |
| if (!pmixp_io_enqueue_ok(eng)){ |
| return false; |
| } |
| |
| if (_send_msg_ok(eng)) { |
| /* The current message is send. Cleanup current msg */ |
| _send_free_current(eng); |
| } |
| |
| if (eng->send_current == NULL) { |
| /* Try next element */ |
| int n = list_count(eng->send_queue); |
| if (0 == n) { |
| /* Nothing to do */ |
| return false; |
| } |
| void *msg = list_dequeue(eng->send_queue); |
| xassert(msg != NULL); |
| if ((rc = _send_set_current(eng, msg))) { |
| PMIXP_ERROR_NO(rc, "Cannot switch to the next message"); |
| pmixp_io_finalize(eng, rc); |
| } |
| } |
| return true; |
| } |
| |
| static void _send_progress(pmixp_io_engine_t *eng) |
| { |
| int fd; |
| |
| xassert(NULL != eng); |
| xassert(eng->magic == PMIXP_MSGSTATE_MAGIC); |
| |
| fd = eng->sd; |
| |
| if (!pmixp_io_operating(eng)){ |
| /* no progress until in the operational mode */ |
| return; |
| } |
| |
| while (_send_pending(eng)) { |
| /* try to send everything until fd became blockable |
| * FIXME: maybe set some restriction on number of |
| * messages sended at once |
| */ |
| int shutdown = 0; |
| struct iovec iov[2]; |
| int iovcnt = 0; |
| size_t written = 0; |
| |
| iov[iovcnt].iov_base = eng->send_msg_ptr; |
| iov[iovcnt].iov_len = eng->send_msg_size; |
| iovcnt++; |
| |
| written = pmixp_writev_buf(fd, iov, iovcnt, |
| eng->send_offs, &shutdown); |
| if (shutdown) { |
| pmixp_io_finalize(eng, shutdown); |
| break; |
| } |
| |
| eng->send_offs += written; |
| if (!written) { |
| break; |
| } |
| } |
| } |
| |
| /* Enqueue the message for send */ |
| int pmixp_io_send_enqueue(pmixp_io_engine_t *eng, void *msg) |
| { |
| xassert(NULL != eng); |
| xassert(PMIXP_MSGSTATE_MAGIC == eng->magic); |
| xassert(NULL != eng->rcvd_hdr_net); |
| xassert(pmixp_io_enqueue_ok(eng)); |
| xassert(eng->h.send_on); |
| |
| /* We should be in the proper state |
| * to accept new messages |
| */ |
| if (!pmixp_io_enqueue_ok(eng)) { |
| PMIXP_ERROR("Trying to enqueue to unprepared engine"); |
| return SLURM_ERROR; |
| } |
| list_enqueue(eng->send_queue, msg); |
| |
| /* if we don't send anything now - try |
| * to progress immediately |
| */ |
| slurm_mutex_lock(&eng->send_lock); |
| _send_progress(eng); |
| slurm_mutex_unlock(&eng->send_lock); |
| pmixp_io_send_cleanup(eng, PMIXP_P2P_INLINE); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| int pmixp_io_send_urgent(pmixp_io_engine_t *eng, void *msg) |
| { |
| xassert(NULL != eng); |
| xassert(PMIXP_MSGSTATE_MAGIC == eng->magic); |
| xassert(NULL != eng->rcvd_hdr_net); |
| xassert(pmixp_io_enqueue_ok(eng)); |
| xassert(eng->h.send_on); |
| |
| /* We should be in the proper state |
| * to accept new messages |
| */ |
| if (!pmixp_io_enqueue_ok(eng)) { |
| PMIXP_ERROR("Trying to enqueue to unprepared engine"); |
| return SLURM_ERROR; |
| } |
| |
| /* Make this message to be first in line */ |
| list_push(eng->send_queue, msg); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| |
| bool pmixp_io_send_pending(pmixp_io_engine_t *eng) |
| { |
| bool ret; |
| slurm_mutex_lock(&eng->send_lock); |
| ret = _send_pending(eng); |
| slurm_mutex_unlock(&eng->send_lock); |
| return ret; |
| } |
| |
| void pmixp_io_send_cleanup(pmixp_io_engine_t *eng, pmixp_p2p_ctx_t ctx) |
| { |
| void *msg = NULL; |
| while ((msg = list_dequeue(eng->complete_queue))) { |
| eng->h.send_complete(msg, ctx, SLURM_SUCCESS); |
| } |
| } |
| |
| |
| void pmixp_io_send_progress(pmixp_io_engine_t *eng) |
| { |
| slurm_mutex_lock(&eng->send_lock); |
| _send_progress(eng); |
| slurm_mutex_unlock(&eng->send_lock); |
| pmixp_io_send_cleanup(eng, PMIXP_P2P_REGULAR); |
| } |