blob: 3875a0337dfd912c83e82e2eeae770e527b1b377 [file] [log] [blame]
/*****************************************************************************\
** pmix_conn.c - module for fd-based connections progress
*****************************************************************************
* Copyright (C) 2017 Mellanox Technologies. All rights reserved.
* Written by Artem Polyakov <artpol84@gmail.com, artemp@mellanox.com>.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include "pmixp_conn.h"
/* temporal engines local API */
static void _tmp_engines_init();
static void _tmp_engines_fini();
static inline pmixp_io_engine_t *_tmp_engines_get_slurm();
static inline void _tmp_engines_return_slurm(pmixp_io_engine_t *eng);
static inline pmixp_io_engine_t *_tmp_engines_get_direct();
static inline void _tmp_engines_return_direct(pmixp_io_engine_t *eng);
static void _msg_handler_destruct(void *obj);
static list_t *_conn_list, *_empty_hndl_list;
static pmixp_p2p_data_t _slurm_hdr, _direct_hdr;
void pmixp_conn_init(pmixp_p2p_data_t slurm_hdr,
pmixp_p2p_data_t direct_hdr)
{
_conn_list = list_create(_msg_handler_destruct);
_empty_hndl_list = list_create(_msg_handler_destruct);
_slurm_hdr = slurm_hdr;
_direct_hdr = direct_hdr;
_tmp_engines_init();
}
void pmixp_conn_fini(void)
{
FREE_NULL_LIST(_conn_list);
FREE_NULL_LIST(_empty_hndl_list);
_tmp_engines_fini();
}
static void _msg_handler_destruct(void *obj)
{
pmixp_conn_t *conn = (pmixp_conn_t*)obj;
switch (conn->type) {
case PMIXP_CONN_TEMP:
pmixp_io_finalize(conn->eng, 0);
xfree(conn->eng);
conn->eng = NULL;
break;
case PMIXP_CONN_EMPTY:
case PMIXP_CONN_PERSIST:
/* - Persistent handlers have their engines allocated
* somewhere else. Don't release it here
* - empty handlers don't carry any data.
*/
break;
default:
/* this shouldn't happen! */
PMIXP_ERROR("Bad message handler connection type: %d",
(int)conn->type);
abort();
}
xfree(conn);
}
void pmixp_conn_cleanup(void)
{
list_itr_t *it = list_iterator_create(_conn_list);
pmixp_conn_t *hndl = NULL;
while ((hndl = list_next(it))) {
if (PMIXP_CONN_EMPTY == hndl->type) {
/* move this handler to the empty list */
list_remove(it);
list_append(_empty_hndl_list, hndl);
}
}
}
pmixp_conn_t *pmixp_conn_new_temp(pmixp_conn_proto_t proto, int fd,
pmixp_conn_new_msg_cb_t nmsg_cb)
{
xassert(proto == PMIXP_PROTO_SLURM || proto == PMIXP_PROTO_DIRECT);
pmixp_conn_t *conn = list_pop(_empty_hndl_list);
if (NULL == conn) {
conn = xmalloc(sizeof(*conn));
}
xassert(PMIXP_PROTO_NONE == conn->proto);
xassert(NULL == conn->eng);
xassert(NULL == conn->rcv_progress_cb);
conn->type = PMIXP_CONN_TEMP;
conn->proto = proto;
/* grab the temp I/O engine of the corresponding type */
switch (proto) {
case PMIXP_PROTO_SLURM:
conn->eng = _tmp_engines_get_slurm();
break;
case PMIXP_PROTO_DIRECT:
conn->eng = _tmp_engines_get_direct();
break;
default:
/* should not happen */
PMIXP_ERROR("Bad protocol type: %d", proto);
abort();
}
pmixp_io_attach(conn->eng, fd);
conn->rcv_progress_cb = nmsg_cb;
conn->ret_cb = NULL;
conn->ret_data = NULL;
conn->hdr = NULL;
return conn;
}
pmixp_conn_t *pmixp_conn_new_persist(pmixp_conn_proto_t proto,
pmixp_io_engine_t *eng,
pmixp_conn_new_msg_cb_t nmsg_cb,
pmixp_conn_ret_cb_t ret_cb, void *ret_data)
{
xassert(proto == PMIXP_PROTO_SLURM || proto == PMIXP_PROTO_DIRECT);
xassert(NULL != eng);
pmixp_conn_t *conn = list_pop(_empty_hndl_list);
if (NULL == conn) {
conn = xmalloc(sizeof(*conn));
}
xassert(PMIXP_PROTO_NONE == conn->proto);
xassert(NULL == conn->eng);
xassert(NULL == conn->rcv_progress_cb);
conn->type = PMIXP_CONN_PERSIST;
conn->proto = proto;
conn->eng = eng;
conn->rcv_progress_cb = nmsg_cb;
conn->ret_cb = ret_cb;
conn->ret_data = ret_data;
conn->hdr = NULL;
return conn;
}
void pmixp_conn_return(pmixp_conn_t *conn)
{
/* if this is a temp connection - return I/O engine */
if (NULL != conn->ret_cb) {
conn->ret_cb(conn);
}
if (NULL != conn->hdr) {
xfree(conn->hdr);
}
switch (conn->type){
case PMIXP_CONN_PERSIST:
/* corresponding I/O engine was allocated somewhere else */
break;
case PMIXP_CONN_TEMP: {
if (pmixp_io_conn_closed(conn->eng)) {
int fd = pmixp_io_detach(conn->eng);
close(fd);
}
/* grab the temp I/O engine of the corresponding type */
switch (conn->proto) {
case PMIXP_PROTO_SLURM:
_tmp_engines_return_slurm(conn->eng);
break;
case PMIXP_PROTO_DIRECT:
_tmp_engines_return_direct(conn->eng);
break;
default:
/* should not happen */
PMIXP_ERROR("Bad protocol type: %d", conn->proto);
abort();
}
break;
}
default:
/* should not happen */
PMIXP_ERROR("Bad connection type: %d", conn->type);
abort();
}
/* this handler will soon be garbage-collected */
memset(conn, 0, sizeof(*conn));
conn->type = PMIXP_CONN_EMPTY;
}
/*
* --------------------- temporal I/O engines -------------------
*/
static void _temp_engine_destruct(void *obj);
static list_t *_slurm_engines, *_direct_engines;
static void _tmp_engines_init()
{
_slurm_engines = list_create(_temp_engine_destruct);
_direct_engines = list_create(_temp_engine_destruct);
}
static void _tmp_engines_fini()
{
FREE_NULL_LIST(_slurm_engines);
FREE_NULL_LIST(_direct_engines);
}
static void _temp_engine_destruct(void *obj)
{
pmixp_io_engine_t *eng = (pmixp_io_engine_t*)obj;
pmixp_io_finalize(eng, 0);
xfree(eng);
}
static inline pmixp_io_engine_t *_tmp_engines_get_slurm()
{
pmixp_io_engine_t *eng = list_pop(_slurm_engines);
if (NULL == eng){
eng = xmalloc(sizeof(*eng));
pmixp_io_init(eng, _slurm_hdr);
}
return eng;
}
static inline void _tmp_engines_return_slurm(pmixp_io_engine_t *eng)
{
xassert(NULL != eng);
list_push(_slurm_engines, eng);
}
static inline pmixp_io_engine_t *_tmp_engines_get_direct()
{
pmixp_io_engine_t *eng = list_pop(_direct_engines);
if (NULL == eng){
eng = xmalloc(sizeof(*eng));
pmixp_io_init(eng, _direct_hdr);
}
return eng;
}
static inline void _tmp_engines_return_direct(pmixp_io_engine_t *eng)
{
xassert(NULL != eng);
list_push(_direct_engines, eng);
}