blob: a5653aa271cd7e448f6e91c1cbc4d0183c2f110c [file] [log] [blame]
/*****************************************************************************\
** pmix_server.c - PMIx server side functionality
*****************************************************************************
* Copyright (C) 2014-2015 Artem Polyakov. All rights reserved.
* Copyright (C) 2015-2020 Mellanox Technologies. All rights reserved.
* Written by Artem Polyakov <artpol84@gmail.com, artemp@mellanox.com>,
* Boris Karasev <karasev.b@gmail.com, boriska@mellanox.com>.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include "pmixp_common.h"
#include "pmixp_info.h"
#include "pmixp_coll.h"
#include "pmixp_debug.h"
#include "pmixp_io.h"
#include "pmixp_client.h"
#include "pmixp_server.h"
#include "pmixp_nspaces.h"
#include "pmixp_state.h"
#include "pmixp_client.h"
#include "pmixp_dmdx.h"
#include "pmixp_conn.h"
#include "pmixp_dconn.h"
#include "src/interfaces/auth.h"
#include "src/interfaces/conn.h"
#define PMIXP_DEBUG_SERVER 1
/*
* --------------------- I/O protocol -------------------
*/
#define PMIXP_SERVER_MSG_MAGIC 0xCAFECA11
typedef struct {
uint32_t magic;
uint32_t type;
uint32_t seq;
uint32_t nodeid;
uint32_t msgsize;
uint8_t ext_flag;
} pmixp_base_hdr_t;
#define PMIXP_BASE_HDR_SIZE (5 * sizeof(uint32_t) + sizeof(uint8_t))
#define PMIXP_BASE_HDR_EXT_SIZE(ep_len) (sizeof(uint32_t) + ep_len)
#define PMIXP_BASE_HDR_MAX (PMIXP_BASE_HDR_SIZE + \
PMIXP_BASE_HDR_EXT_SIZE(pmixp_dconn_ep_len()))
/* In the server buffer we have one service field of type uint32_t */
#define PMIXP_SERVER_BUFFER_OFFS (PMIXP_BASE_HDR_MAX + sizeof(uint32_t))
typedef struct {
uint32_t size; /* Has to be first (appended by Slurm API) */
pmixp_base_hdr_t shdr;
} pmixp_slurm_rhdr_t;
#define PMIXP_SAPI_RECV_HDR_SIZE (sizeof(uint32_t) + PMIXP_BASE_HDR_SIZE)
#define PMIXP_BASE_HDR_SETUP(bhdr, mtype, mseq, buf) \
{ \
bhdr.magic = PMIXP_SERVER_MSG_MAGIC; \
bhdr.type = mtype; \
bhdr.msgsize = get_buf_offset(buf) - PMIXP_SERVER_BUFFER_OFFS; \
bhdr.seq = mseq; \
bhdr.nodeid = pmixp_info_nodeid_job(); \
bhdr.ext_flag = 0; \
}
#define PMIXP_SERVER_BUF_MAGIC 0xCA11CAFE
buf_t *pmixp_server_buf_new(void)
{
size_t offset = PMIXP_SERVER_BUFFER_OFFS;
buf_t *buf = create_buf(xmalloc(offset), offset);
uint32_t *service = (uint32_t*)get_buf_data(buf);
/* Use the first size_t cell to identify the payload
* offset. Value 0 is special meaning that buffer wasn't
* yet finalized
*/
service[0] = 0;
#ifdef PMIXP_DEBUG_SERVER
xassert( PMIXP_BASE_HDR_MAX >= sizeof(uint32_t));
/* Makesure that we only use buffers allocated through
* this call, because we reserve the space for the
* header here
*/
service[1] = PMIXP_SERVER_BUF_MAGIC;
#endif
/* Skip header. It will be filled right before the sending */
set_buf_offset(buf, offset);
return buf;
}
size_t pmixp_server_buf_reset(buf_t *buf)
{
uint32_t *service = (uint32_t*)get_buf_data(buf);
service[0] = 0;
#ifdef PMIXP_DEBUG_SERVER
xassert( PMIXP_BASE_HDR_MAX >= sizeof(uint32_t));
xassert( PMIXP_BASE_HDR_MAX <= get_buf_offset(buf) );
/* Makesure that we only use buffers allocated through
* this call, because we reserve the space for the
* header here
*/
service[1] = PMIXP_SERVER_BUF_MAGIC;
#endif
set_buf_offset(buf, PMIXP_SERVER_BUFFER_OFFS);
return PMIXP_SERVER_BUFFER_OFFS;
}
static void *_buf_finalize(buf_t *buf, void *nhdr, size_t hsize,
size_t *dsize)
{
size_t offset;
uint32_t *service = (uint32_t*)get_buf_data(buf);
char *ptr = get_buf_data(buf);
if (!service[0]) {
offset = PMIXP_SERVER_BUFFER_OFFS - hsize;
#ifdef PMIXP_DEBUG_SERVER
xassert(PMIXP_BASE_HDR_MAX >= hsize);
xassert(PMIXP_BASE_HDR_MAX <= get_buf_offset(buf));
/* Makesure that we only use buffers allocated through
* this call, because we reserve the space for the
* header here
*/
xassert(PMIXP_SERVER_BUF_MAGIC == service[1]);
#endif
/* Enough space for any header was reserved at the
* time of buffer initialization in `pmixp_server_new_buf`
* put the header in place and return proper pointer
*/
if (hsize) {
memcpy(ptr + offset, nhdr, hsize);
}
service[0] = offset;
} else {
/* This buffer was already finalized */
offset = service[0];
#ifdef PMIXP_DEBUG_SERVER
/* We expect header to be the same */
xassert(0 == memcmp(ptr+offset, nhdr, hsize));
#endif
}
*dsize = get_buf_offset(buf) - offset;
return ptr + offset;
}
static void _base_hdr_pack_full(buf_t *packbuf, pmixp_base_hdr_t *hdr)
{
if (hdr->ext_flag) {
hdr->msgsize += PMIXP_BASE_HDR_EXT_SIZE(pmixp_dconn_ep_len());
}
pack32(hdr->magic, packbuf);
pack32(hdr->type, packbuf);
pack32(hdr->seq, packbuf);
pack32(hdr->nodeid, packbuf);
pack32(hdr->msgsize, packbuf);
pack8(hdr->ext_flag, packbuf);
if (hdr->ext_flag) {
packmem(pmixp_dconn_ep_data(), pmixp_dconn_ep_len(), packbuf);
xassert(get_buf_offset(packbuf) ==
(PMIXP_BASE_HDR_SIZE +
PMIXP_BASE_HDR_EXT_SIZE(pmixp_dconn_ep_len())));
}
}
#define WRITE_HDR_FIELD(dst, offset, field) { \
memcpy((dst) + (offset), &(field), sizeof(field)); \
offset += sizeof(field); \
}
static size_t _base_hdr_pack_full_samearch(pmixp_base_hdr_t *hdr, void *net)
{
int offset = 0;
if (hdr->ext_flag) {
hdr->msgsize += PMIXP_BASE_HDR_EXT_SIZE(pmixp_dconn_ep_len());
}
WRITE_HDR_FIELD(net, offset, hdr->magic);
WRITE_HDR_FIELD(net, offset, hdr->type);
WRITE_HDR_FIELD(net, offset, hdr->seq);
WRITE_HDR_FIELD(net, offset, hdr->nodeid);
WRITE_HDR_FIELD(net, offset, hdr->msgsize);
WRITE_HDR_FIELD(net, offset, hdr->ext_flag);
if (hdr->ext_flag) {
buf_t *buf = create_buf(net + offset, PMIXP_BASE_HDR_MAX);
packmem(pmixp_dconn_ep_data(), pmixp_dconn_ep_len(), buf);
offset += get_buf_offset(buf);
buf->head = NULL;
FREE_NULL_BUFFER(buf);
}
return offset;
}
static int _base_hdr_unpack_fixed(buf_t *packbuf, pmixp_base_hdr_t *hdr)
{
if (unpack32(&hdr->magic, packbuf)) {
return -EINVAL;
}
xassert(PMIXP_SERVER_MSG_MAGIC == hdr->magic);
if (unpack32(&hdr->type, packbuf)) {
return -EINVAL;
}
if (unpack32(&hdr->seq, packbuf)) {
return -EINVAL;
}
if (unpack32(&hdr->nodeid, packbuf)) {
return -EINVAL;
}
if (unpack32(&hdr->msgsize, packbuf)) {
return -EINVAL;
}
if (unpack8(&hdr->ext_flag, packbuf)) {
return -EINVAL;
}
return 0;
}
#define READ_HDR_FIELD(src, offset, field) { \
memcpy(&(field), (src) + (offset), sizeof(field)); \
offset += sizeof(field); \
}
static int _base_hdr_unpack_fixed_samearch(void *net, void *host)
{
size_t offset = 0;
pmixp_base_hdr_t *hdr = (pmixp_base_hdr_t *)host;
READ_HDR_FIELD(net, offset, hdr->magic);
READ_HDR_FIELD(net, offset, hdr->type);
READ_HDR_FIELD(net, offset, hdr->seq);
READ_HDR_FIELD(net, offset, hdr->nodeid);
READ_HDR_FIELD(net, offset, hdr->msgsize);
READ_HDR_FIELD(net, offset, hdr->ext_flag);
return 0;
}
static int _base_hdr_unpack_ext(buf_t *packbuf, char **ep_data, uint32_t *ep_len)
{
if (unpackmem_xmalloc(ep_data, ep_len, packbuf)) {
return -EINVAL;
}
return 0;
}
static int _sapi_rhdr_unpack_fixed(buf_t *packbuf, pmixp_slurm_rhdr_t *hdr)
{
if (unpack32(&hdr->size, packbuf)) {
return -EINVAL;
}
if (_base_hdr_unpack_fixed(packbuf, &hdr->shdr)) {
return -EINVAL;
}
return 0;
}
/* Slurm protocol I/O header */
static uint32_t _slurm_proto_msize(void *buf);
static int _slurm_pack_hdr(pmixp_base_hdr_t *hdr, void *net);
static int _slurm_proto_unpack_hdr(void *net, void *host);
static void _slurm_new_msg(pmixp_conn_t *conn, void *_hdr, void *msg);
static int _slurm_send(pmixp_ep_t *ep, pmixp_base_hdr_t bhdr, buf_t *buf);
pmixp_p2p_data_t _slurm_proto = {
/* generic callbacks */
.payload_size_cb = _slurm_proto_msize,
/* receiver-related fields */
.recv_on = 1,
.rhdr_host_size = sizeof(pmixp_slurm_rhdr_t),
.rhdr_net_size = PMIXP_SAPI_RECV_HDR_SIZE, /*need to skip user ID*/
.recv_padding = sizeof(uint32_t),
.hdr_unpack_cb = _slurm_proto_unpack_hdr,
};
/* direct protocol I/O header */
static uint32_t _direct_paysize(void *hdr);
static size_t _direct_hdr_pack_portable(pmixp_base_hdr_t *hdr, void *net);
static int _direct_hdr_unpack_portable(void *net, void *host);
static size_t _direct_hdr_pack_samearch(pmixp_base_hdr_t *hdr, void *net);
static int _direct_hdr_unpack_samearch(void *net, void *host);
typedef size_t (*_direct_hdr_pack_t)(pmixp_base_hdr_t *hdr, void *net);
_direct_hdr_pack_t _direct_hdr_pack = _direct_hdr_pack_samearch;
static void *_direct_msg_ptr(void *msg);
static size_t _direct_msg_size(void *msg);
static void _direct_send_complete(void *msg, pmixp_p2p_ctx_t ctx, int rc);
static void _direct_new_msg(void *hdr, buf_t *buf);
static void _direct_new_msg_conn(pmixp_conn_t *conn, void *_hdr, void *msg);
static void _direct_send(pmixp_dconn_t *dconn, pmixp_ep_t *ep,
pmixp_base_hdr_t bhdr, buf_t *buf,
pmixp_server_sent_cb_t complete_cb, void *cb_data);
static void _direct_return_connection(pmixp_conn_t *conn);
typedef struct {
pmixp_base_hdr_t hdr;
void *buffer;
buf_t *buf_ptr;
pmixp_server_sent_cb_t sent_cb;
void *cbdata;
}_direct_proto_message_t;
pmixp_p2p_data_t _direct_proto = {
/* receiver-related fields */
.recv_on = 1,
.rhdr_host_size = sizeof(pmixp_base_hdr_t),
.rhdr_net_size = PMIXP_BASE_HDR_SIZE,
.recv_padding = 0, /* no padding for the direct proto */
.payload_size_cb = _direct_paysize,
.hdr_unpack_cb = _direct_hdr_unpack_samearch,
.new_msg = _direct_new_msg,
/* transmitter-related fields */
.send_on = 1,
.buf_ptr = _direct_msg_ptr,
.buf_size = _direct_msg_size,
.send_complete = _direct_send_complete
};
/*
* --------------------- Initi/Finalize -------------------
*/
static volatile int _was_initialized = 0;
int pmixp_stepd_init(const stepd_step_rec_t *step, char ***env)
{
char *path;
int fd, rc;
if (SLURM_SUCCESS != (rc = pmixp_info_set(step, env))) {
PMIXP_ERROR("pmixp_info_set(step, env) failed");
goto err_info;
}
/* Create UNIX socket for slurmd communication */
path = pmixp_info_nspace_usock(pmixp_info_namespace());
if (NULL == path) {
PMIXP_ERROR("pmixp_info_nspace_usock: out-of-memory");
rc = SLURM_ERROR;
goto err_path;
}
if ((fd = pmixp_usock_create_srv(path)) < 0) {
PMIXP_ERROR("pmixp_usock_create_srv");
rc = SLURM_ERROR;
goto err_usock;
}
pmixp_info_srv_usock_set(path, fd);
if (!pmixp_info_same_arch()){
_direct_proto.hdr_unpack_cb = _direct_hdr_unpack_portable;
_direct_hdr_pack = _direct_hdr_pack_portable;
}
pmixp_conn_init(_slurm_proto, _direct_proto);
if((rc = pmixp_dconn_init(pmixp_info_nodes_uni(), _direct_proto)) ){
PMIXP_ERROR("pmixp_dconn_init() failed");
goto err_dconn;
}
if ((rc = pmixp_nspaces_init())) {
PMIXP_ERROR("pmixp_nspaces_init() failed");
goto err_nspaces;
}
if (SLURM_SUCCESS != (rc = pmixp_state_init())) {
PMIXP_ERROR("pmixp_state_init() failed");
goto err_state;
}
if (SLURM_SUCCESS != (rc = pmixp_dmdx_init())) {
PMIXP_ERROR("pmixp_dmdx_init() failed");
goto err_dmdx;
}
if (SLURM_SUCCESS != (rc = pmixp_libpmix_init())) {
PMIXP_ERROR("pmixp_libpmix_init() failed");
goto err_lib;
}
if (SLURM_SUCCESS != (rc = pmixp_libpmix_job_set())) {
PMIXP_ERROR("pmixp_libpmix_job_set() failed");
goto err_job;
}
pmixp_server_init_pp(env);
pmixp_server_init_cperf(env);
xfree(path);
_was_initialized = 1;
return SLURM_SUCCESS;
err_job:
pmixp_libpmix_finalize();
err_lib:
pmixp_dmdx_finalize();
err_dmdx:
pmixp_state_finalize();
err_state:
pmixp_nspaces_finalize();
err_nspaces:
pmixp_dconn_fini();
err_dconn:
pmixp_conn_fini();
close(pmixp_info_srv_usock_fd());
err_usock:
xfree(path);
err_path:
pmixp_info_free();
err_info:
return rc;
}
int pmixp_stepd_finalize(void)
{
char *path;
if (!_was_initialized) {
/* nothing to do */
return 0;
}
pmixp_libpmix_finalize();
pmixp_dmdx_finalize();
pmixp_conn_fini();
pmixp_dconn_fini();
pmixp_state_finalize();
pmixp_nspaces_finalize();
/* cleanup the UNIX socket */
PMIXP_DEBUG("Remove PMIx plugin usock");
close(pmixp_info_srv_usock_fd());
path = pmixp_info_nspace_usock(pmixp_info_namespace());
unlink(path);
xfree(path);
/* free the information */
pmixp_info_free();
return SLURM_SUCCESS;
}
void pmixp_server_cleanup(void)
{
pmixp_conn_cleanup();
}
/*
* --------------------- Authentication functionality -------------------
*/
static int _auth_cred_create(buf_t *buf, uid_t uid)
{
void *auth_cred = NULL;
int rc = SLURM_SUCCESS;
auth_cred = auth_g_create(AUTH_DEFAULT_INDEX, slurm_conf.authinfo,
uid, NULL, 0);
if (!auth_cred) {
PMIXP_ERROR("Creating authentication credential: %m");
return errno;
}
/*
* We can use SLURM_PROTOCOL_VERSION here since there is no possibility
* of protocol mismatch.
*/
rc = auth_g_pack(auth_cred, buf, SLURM_PROTOCOL_VERSION);
if (rc)
PMIXP_ERROR("Packing authentication credential: %m");
auth_g_destroy(auth_cred);
return rc;
}
static int _auth_cred_verify(buf_t *buf, uid_t *uid)
{
void *auth_cred = NULL;
int rc = SLURM_SUCCESS;
/*
* We can use SLURM_PROTOCOL_VERSION here since there is no possibility
* of protocol mismatch.
*/
auth_cred = auth_g_unpack(buf, SLURM_PROTOCOL_VERSION);
if (!auth_cred) {
PMIXP_ERROR("Unpacking authentication credential: %m");
return SLURM_ERROR;
}
rc = auth_g_verify(auth_cred, slurm_conf.authinfo);
if (rc) {
PMIXP_ERROR("Verifying authentication credential: %m");
} else {
uid_t auth_uid;
auth_uid = auth_g_get_uid(auth_cred);
if ((auth_uid != slurm_conf.slurmd_user_id) &&
(auth_uid != _pmixp_job_info.uid)) {
PMIXP_ERROR("Credential from uid %u", auth_uid);
rc = SLURM_ERROR;
}
*uid = auth_uid;
}
auth_g_destroy(auth_cred);
return rc;
}
/*
* --------------------- Generic I/O functionality -------------------
*/
static bool _serv_readable(eio_obj_t *obj);
static int _serv_read(eio_obj_t *obj, list_t *objs);
static bool _serv_writable(eio_obj_t *obj);
static int _serv_write(eio_obj_t *obj, list_t *objs);
static void _process_server_request(pmixp_base_hdr_t *hdr, buf_t *buf);
static struct io_operations slurm_peer_ops = {
.readable = _serv_readable,
.handle_read = _serv_read
};
static struct io_operations direct_peer_ops = {
.readable = _serv_readable,
.handle_read = _serv_read,
.writable = _serv_writable,
.handle_write = _serv_write
};
static bool _serv_readable(eio_obj_t *obj)
{
/* sanity check */
xassert(NULL != obj );
if (obj->shutdown) {
/* corresponding connection will be
* cleaned up during plugin finalize
*/
return false;
}
return true;
}
static int _serv_read(eio_obj_t *obj, list_t *objs)
{
/* sanity check */
xassert(NULL != obj );
if (obj->shutdown) {
/* corresponding connection will be
* cleaned up during plugin finalize
*/
return 0;
}
pmixp_conn_t *conn = (pmixp_conn_t *)obj->arg;
bool proceed = true;
/* debug stub */
pmixp_debug_hang(0);
/* Read and process all received messages */
while (proceed) {
if (!pmixp_conn_progress_rcv(conn)) {
proceed = false;
}
if (!pmixp_conn_is_alive(conn)) {
obj->shutdown = true;
PMIXP_DEBUG("Connection closed fd = %d", obj->fd);
pmixp_conn_return(conn);
proceed = false;
}
}
return 0;
}
static bool _serv_writable(eio_obj_t *obj)
{
/* sanity check */
xassert(NULL != obj );
if (obj->shutdown) {
/* corresponding connection will be
* cleaned up during plugin finalize
*/
return false;
}
/* get I/O engine */
pmixp_conn_t *conn = (pmixp_conn_t *)obj->arg;
pmixp_io_engine_t *eng = conn->eng;
/* debug stub */
pmixp_debug_hang(0);
/* Invoke cleanup callbacks if any */
pmixp_io_send_cleanup(eng, PMIXP_P2P_REGULAR);
/* check if we have something to send */
if (pmixp_io_send_pending(eng)) {
return true;
}
return false;
}
static int _serv_write(eio_obj_t *obj, list_t *objs)
{
/* sanity check */
xassert(NULL != obj );
if (obj->shutdown) {
/* corresponding connection will be
* cleaned up during plugin finalize
*/
return 0;
}
PMIXP_DEBUG("fd = %d", obj->fd);
pmixp_conn_t *conn = (pmixp_conn_t *)obj->arg;
/* debug stub */
pmixp_debug_hang(0);
/* progress sends */
pmixp_conn_progress_snd(conn);
/* if we are done with this connection - remove it */
if (!pmixp_conn_is_alive(conn)) {
obj->shutdown = true;
PMIXP_DEBUG("Connection finalized fd = %d", obj->fd);
pmixp_conn_return(conn);
}
return 0;
}
static int _process_extended_hdr(pmixp_base_hdr_t *hdr, buf_t *buf)
{
char nhdr[PMIXP_BASE_HDR_MAX];
bool send_init = false;
size_t dsize = 0, hsize = 0;
pmixp_dconn_t *dconn;
_direct_proto_message_t *init_msg = NULL;
int rc = SLURM_SUCCESS;
char *ep_data = NULL;
uint32_t ep_len = 0;
dconn = pmixp_dconn_lock(hdr->nodeid);
if (!dconn) {
/* Should not happen */
xassert( dconn );
abort();
}
/* Retrieve endpoint information */
_base_hdr_unpack_ext(buf, &ep_data, &ep_len);
/* Check if init message is required to establish
* the connection
*/
if (!pmixp_dconn_require_connect(dconn, &send_init)) {
goto unlock;
}
if (send_init) {
buf_t *buf_init = pmixp_server_buf_new();
pmixp_base_hdr_t bhdr;
init_msg = xmalloc(sizeof(*init_msg));
rc = _auth_cred_create(buf_init, dconn->uid);
if (rc) {
FREE_NULL_BUFFER(init_msg->buf_ptr);
xfree(init_msg);
goto unlock;
}
PMIXP_BASE_HDR_SETUP(bhdr, PMIXP_MSG_INIT_DIRECT, 0, buf_init);
bhdr.ext_flag = 1;
hsize = _direct_hdr_pack(&bhdr, nhdr);
init_msg->sent_cb = pmixp_server_sent_buf_cb;
init_msg->cbdata = buf_init;
init_msg->hdr = bhdr;
init_msg->buffer = _buf_finalize(buf_init, nhdr, hsize,
&dsize);
init_msg->buf_ptr = buf_init;
}
rc = pmixp_dconn_connect(dconn, ep_data, ep_len, init_msg);
if (rc) {
PMIXP_ERROR("Unable to connect to %d", dconn->nodeid);
if (init_msg) {
/* need to release `init_msg` here */
FREE_NULL_BUFFER(init_msg->buf_ptr);
xfree(init_msg);
}
goto unlock;
}
switch (pmixp_dconn_progress_type()) {
case PMIXP_DCONN_PROGRESS_SW:{
/* this direct connection has fd that needs to be
* polled to progress, use connection interface for that
*/
pmixp_io_engine_t *eng = pmixp_dconn_engine(dconn);
pmixp_conn_t *conn;
conn = pmixp_conn_new_persist(PMIXP_PROTO_DIRECT, eng,
_direct_new_msg_conn,
_direct_return_connection,
dconn);
if (conn) {
eio_obj_t *obj;
obj = eio_obj_create(pmixp_io_fd(eng),
&direct_peer_ops,
(void *)conn);
eio_new_obj(pmixp_info_io(), obj);
eio_signal_wakeup(pmixp_info_io());
} else {
/* TODO: handle this error */
rc = SLURM_ERROR;
goto unlock;
}
break;
}
case PMIXP_DCONN_PROGRESS_HW: {
break;
}
default:
/* Should not happen */
xassert(0 && pmixp_dconn_progress_type());
/* TODO: handle this error */
}
unlock:
pmixp_dconn_unlock(dconn);
return rc;
}
static void _process_server_request(pmixp_base_hdr_t *hdr, buf_t *buf)
{
int rc;
switch (hdr->type) {
case PMIXP_MSG_FAN_IN:
case PMIXP_MSG_FAN_OUT: {
pmixp_coll_t *coll;
pmix_proc_t *procs = NULL;
size_t nprocs = 0;
pmixp_coll_type_t type = 0;
int c_nodeid;
rc = pmixp_coll_tree_unpack(buf, &type, &c_nodeid,
&procs, &nprocs);
if (SLURM_SUCCESS != rc) {
char *nodename = pmixp_info_job_host(hdr->nodeid);
PMIXP_ERROR("Bad message header from node %s",
nodename);
xfree(nodename);
goto exit;
}
if (PMIXP_COLL_TYPE_FENCE_TREE != type) {
char *nodename = pmixp_info_job_host(hdr->nodeid);
PMIXP_ERROR("Unexpected collective type=%s from node %s, expected=%s",
pmixp_coll_type2str(type), nodename,
pmixp_coll_type2str(PMIXP_COLL_TYPE_FENCE_TREE));
xfree(nodename);
goto exit;
}
coll = pmixp_state_coll_get(type, procs, nprocs);
xfree(procs);
if (!coll) {
PMIXP_ERROR("Unable to pmixp_state_coll_get()");
break;
}
pmixp_coll_sanity_check(coll);
#ifdef PMIXP_COLL_DEBUG
PMIXP_DEBUG("%s collective message from nodeid = %u, type = %s, seq = %d",
pmixp_coll_type2str(type),
hdr->nodeid,
((PMIXP_MSG_FAN_IN == hdr->type) ?
"fan-in" : "fan-out"),
hdr->seq);
#endif
rc = pmixp_coll_check(coll, hdr->seq);
if (PMIXP_COLL_REQ_FAILURE == rc) {
/* this is an unacceptable event: either something went
* really wrong or the state machine is incorrect.
* This will 100% lead to application hang.
*/
char *nodename = pmixp_info_job_host(hdr->nodeid);
PMIXP_ERROR("Bad collective seq. #%d from %s:%u, current is %d",
hdr->seq, nodename, hdr->nodeid, coll->seq);
pmixp_debug_hang(0); /* enable hang to debug this! */
slurm_kill_job_step(pmixp_info_jobid(),
pmixp_info_stepid(), SIGKILL, 0);
xfree(nodename);
break;
} else if (PMIXP_COLL_REQ_SKIP == rc) {
#ifdef PMIXP_COLL_DEBUG
PMIXP_DEBUG("Wrong collective seq. #%d from nodeid %u, current is %d, skip this message",
hdr->seq, hdr->nodeid, coll->seq);
#endif
goto exit;
}
if (PMIXP_MSG_FAN_IN == hdr->type) {
pmixp_coll_tree_child(coll, hdr->nodeid,
hdr->seq, buf);
} else {
pmixp_coll_tree_parent(coll, hdr->nodeid,
hdr->seq, buf);
}
break;
}
case PMIXP_MSG_DMDX: {
pmixp_dmdx_process(buf, hdr->nodeid, hdr->seq);
/* buf will be free'd by the PMIx callback so
* protect the data by voiding the buffer.
* Use the statement below instead of (buf = NULL)
* to maintain incapsulation - in general `buf`is
* not a pointer, but opaque type.
*/
buf = create_buf(NULL, 0);
break;
}
case PMIXP_MSG_INIT_DIRECT:
PMIXP_DEBUG("Direct connection init from %d", hdr->nodeid);
break;
#ifndef NDEBUG
case PMIXP_MSG_PINGPONG: {
/* if the pingpong mode was activated -
* node 0 sends ping requests
* and receiver assumed to respond back to node 0
*/
int msize = remaining_buf(buf);
if (pmixp_info_nodeid()) {
pmixp_server_pp_send(0, msize);
} else {
if (pmixp_server_pp_same_thread()) {
if (pmixp_server_pp_count() ==
pmixp_server_pp_warmups()) {
pmixp_server_pp_start();
}
if (!pmixp_server_pp_check_fini(msize)) {
pmixp_server_pp_send(1, msize);
}
}
}
pmixp_server_pp_inc();
break;
}
#endif
case PMIXP_MSG_RING: {
pmixp_coll_t *coll = NULL;
pmix_proc_t *procs = NULL;
size_t nprocs = 0;
pmixp_coll_ring_msg_hdr_t ring_hdr;
pmixp_coll_type_t type = 0;
if (pmixp_coll_ring_unpack(buf, &type, &ring_hdr,
&procs, &nprocs)) {
char *nodename = pmixp_info_job_host(hdr->nodeid);
PMIXP_ERROR("Bad message header from node %s",
nodename);
xfree(nodename);
goto exit;
}
if (PMIXP_COLL_TYPE_FENCE_RING != type) {
char *nodename = pmixp_info_job_host(hdr->nodeid);
PMIXP_ERROR("Unexpected collective type=%s from node %s:%u, expected=%s",
pmixp_coll_type2str(type), nodename, hdr->nodeid,
pmixp_coll_type2str(PMIXP_COLL_TYPE_FENCE_RING));
xfree(nodename);
goto exit;
}
coll = pmixp_state_coll_get(type, procs, nprocs);
xfree(procs);
if (!coll) {
PMIXP_ERROR("Unable to pmixp_state_coll_get()");
break;
}
pmixp_coll_sanity_check(coll);
#ifdef PMIXP_COLL_DEBUG
PMIXP_DEBUG("%s collective message from nodeid=%u, contrib_id=%u, seq=%u, hop=%u, msgsize=%lu",
pmixp_coll_type2str(type),
hdr->nodeid, ring_hdr.contrib_id,
ring_hdr.seq, ring_hdr.hop_seq, ring_hdr.msgsize);
#endif
if (pmixp_coll_ring_check(coll, &ring_hdr)) {
char *nodename = pmixp_info_job_host(hdr->nodeid);
PMIXP_ERROR("%p: unexpected contrib from %s:%u, coll->seq=%d, seq=%d",
coll, nodename, hdr->nodeid,
coll->seq, hdr->seq);
xfree(nodename);
break;
}
pmixp_coll_ring_neighbor(coll, &ring_hdr, buf);
break;
}
default:
PMIXP_ERROR("Unknown message type %d", hdr->type);
break;
}
exit:
FREE_NULL_BUFFER(buf);
}
void pmixp_server_sent_buf_cb(int rc, pmixp_p2p_ctx_t ctx, void *data)
{
buf_t *buf = (buf_t *) data;
FREE_NULL_BUFFER(buf);
return;
}
int pmixp_server_send_nb(pmixp_ep_t *ep, pmixp_srv_cmd_t type,
uint32_t seq, buf_t *buf,
pmixp_server_sent_cb_t complete_cb,
void *cb_data)
{
pmixp_base_hdr_t bhdr;
int rc = SLURM_ERROR;
pmixp_dconn_t *dconn = NULL;
PMIXP_BASE_HDR_SETUP(bhdr, type, seq, buf);
/* if direct connection is not enabled
* always use Slurm protocol
*/
if (!pmixp_info_srv_direct_conn()) {
goto send_slurm;
}
switch (ep->type) {
case PMIXP_EP_HLIST:
goto send_slurm;
case PMIXP_EP_NOIDEID:{
int hostid;
hostid = ep->ep.nodeid;
xassert(0 <= hostid);
dconn = pmixp_dconn_lock(hostid);
switch (pmixp_dconn_state(dconn)) {
case PMIXP_DIRECT_EP_SENT:
case PMIXP_DIRECT_CONNECTED:
/* keep the lock here and proceed
* to the direct send
*/
goto send_direct;
case PMIXP_DIRECT_INIT:
pmixp_dconn_req_sent(dconn);
pmixp_dconn_unlock(dconn);
goto send_slurm;
default:{
/* this is a bug! */
pmixp_dconn_state_t state = pmixp_dconn_state(dconn);
pmixp_dconn_unlock(dconn);
PMIXP_ERROR("Bad direct connection state: %d",
(int)state);
xassert( (state == PMIXP_DIRECT_INIT) ||
(state == PMIXP_DIRECT_EP_SENT) ||
(state == PMIXP_DIRECT_CONNECTED) );
abort();
}
}
}
default:
PMIXP_ERROR("Bad value of the endpoint type: %d",
(int)ep->type);
xassert( PMIXP_EP_HLIST == ep->type ||
PMIXP_EP_NOIDEID == ep->type);
abort();
}
return rc;
send_slurm:
rc = _slurm_send(ep, bhdr, buf);
complete_cb(rc, PMIXP_P2P_INLINE, cb_data);
return SLURM_SUCCESS;
send_direct:
xassert( NULL != dconn );
_direct_send(dconn, ep, bhdr, buf, complete_cb, cb_data);
pmixp_dconn_unlock(dconn);
return SLURM_SUCCESS;
}
/*
* ------------------- Abort handling protocol -----------------------
*/
static int _abort_status = SLURM_SUCCESS;
void pmixp_abort_handle(void *tls_conn)
{
uint32_t status;
int len;
/* Receive the status from stepd */
len = slurm_read_stream(tls_conn, (char *) &status, sizeof(status));
if (len != sizeof(status)) {
PMIXP_ERROR("slurm_read_stream() failed: %m");
return;
}
/* Apply the received status */
if (!_abort_status) {
_abort_status = (int)ntohl(status);
}
/* Reply back to confirm that the status was processed */
len = slurm_write_stream(tls_conn, (char *) &status, sizeof(status));
if (len != sizeof(status)) {
PMIXP_ERROR("slurm_write_stream() failed: %m");
return;
}
}
void pmixp_abort_propagate(int status)
{
void *tls_conn = NULL;
uint32_t status_net = htonl((uint32_t)status);
int len;
slurm_addr_t abort_server;
if (!(pmixp_info_srun_ip()) || (pmixp_info_abort_agent_port() <= 0)) {
PMIXP_ERROR("Invalid abort agent connection address: %s:%d",
pmixp_info_srun_ip() ? pmixp_info_srun_ip(): "NULL",
pmixp_info_abort_agent_port());
return;
}
PMIXP_DEBUG("Connecting to abort agent: %s:%d",
pmixp_info_srun_ip(),
pmixp_info_abort_agent_port());
slurm_set_addr(&abort_server, pmixp_info_abort_agent_port(),
pmixp_info_srun_ip());
/* WARNING - this cannot use encryption currently */
if (!(tls_conn = slurm_open_msg_conn(&abort_server, NULL))) {
PMIXP_ERROR("slurm_open_stream() failed: %m");
PMIXP_ERROR("Connecting to abort agent failed: %s:%d",
pmixp_info_srun_ip(),
pmixp_info_abort_agent_port());
return;
}
len = slurm_write_stream(tls_conn, (char *) &status_net,
sizeof(status_net));
if (len != sizeof(status_net)) {
PMIXP_ERROR("slurm_write_stream() failed: %m");
PMIXP_ERROR("Communicating with abort agent failed: %s:%d",
pmixp_info_srun_ip(),
pmixp_info_abort_agent_port());
goto close_fd;
}
len = slurm_read_stream(tls_conn, (char *) &status_net,
sizeof(status_net));
if (len != sizeof(status_net)) {
PMIXP_ERROR("slurm_read_stream() failed: %m");
PMIXP_ERROR("Communicating with abort agent failed: %s:%d",
pmixp_info_srun_ip(),
pmixp_info_abort_agent_port());
goto close_fd;
}
xassert(status_net == htonl((uint32_t)status));
close_fd:
conn_g_destroy(tls_conn, true);
}
int pmixp_abort_code_get(void)
{
return _abort_status;
}
/*
* ------------------- DIRECT communication protocol -----------------------
*/
/* Size of the payload */
static uint32_t _direct_paysize(void *buf)
{
pmixp_base_hdr_t *hdr = (pmixp_base_hdr_t *)buf;
return hdr->msgsize;
}
/*
* Unpack message header.
* Returns 0 on success and -errno on failure
*/
static int _direct_hdr_unpack_portable(void *net, void *host)
{
pmixp_base_hdr_t *hdr = (pmixp_base_hdr_t *)host;
buf_t *packbuf = create_buf(net, PMIXP_BASE_HDR_SIZE);
if (_base_hdr_unpack_fixed(packbuf, hdr)) {
return -EINVAL;
}
/* free packbuf, but not the memory it points to */
packbuf->head = NULL;
FREE_NULL_BUFFER(packbuf);
return 0;
}
static int _direct_hdr_unpack_samearch(void *net, void *host)
{
return _base_hdr_unpack_fixed_samearch(net, host);
}
/*
* Pack message header. Returns packed size
*/
static size_t _direct_hdr_pack_portable(pmixp_base_hdr_t *hdr, void *net)
{
buf_t *buf = create_buf(net, PMIXP_BASE_HDR_MAX);
int size = 0;
_base_hdr_pack_full(buf, hdr);
size = get_buf_offset(buf);
xassert(size >= PMIXP_BASE_HDR_SIZE);
xassert(size <= PMIXP_BASE_HDR_MAX);
/* free packbuf, but not the memory it points to */
buf->head = NULL;
FREE_NULL_BUFFER(buf);
return size;
}
static size_t _direct_hdr_pack_samearch(pmixp_base_hdr_t *hdr, void *net)
{
return _base_hdr_pack_full_samearch(hdr, net);
}
/* Get the pointer to the message buffer */
static void *_direct_msg_ptr(void *msg)
{
_direct_proto_message_t *_msg = (_direct_proto_message_t*)msg;
return _msg->buffer;
}
/* Message size */
static size_t _direct_msg_size(void *msg)
{
_direct_proto_message_t *_msg = (_direct_proto_message_t*)msg;
return (_msg->hdr.msgsize + PMIXP_BASE_HDR_SIZE);
}
/* Release message.
* TODO: We need to fix that: I/O engine needs a way
* to provide the error code
*/
static void _direct_send_complete(void *_msg, pmixp_p2p_ctx_t ctx, int rc)
{
_direct_proto_message_t *msg = (_direct_proto_message_t*)_msg;
msg->sent_cb(rc, ctx, msg->cbdata);
xfree(msg);
}
/*
* TODO: merge with _direct_new_msg as they have nearly similar functionality
* This one is part of I/O header.
*/
static void _direct_new_msg(void *_hdr, buf_t *buf)
{
pmixp_base_hdr_t *hdr = (pmixp_base_hdr_t*)_hdr;
if (hdr->ext_flag) {
/* Extra information was incorporated into this message.
* This should be an endpoint data
*/
_process_extended_hdr(hdr, buf);
}
_process_server_request(hdr, buf);
}
/*
* See process_handler_t prototype description
* on the details of this function output values
*/
static void _direct_new_msg_conn(pmixp_conn_t *conn, void *_hdr, void *msg)
{
pmixp_base_hdr_t *hdr = (pmixp_base_hdr_t*)_hdr;
buf_t *buf = create_buf(msg, hdr->msgsize);
_process_server_request(hdr, buf);
}
/* Process direct connection closure
*/
static void _direct_return_connection(pmixp_conn_t *conn)
{
pmixp_dconn_t *dconn = (pmixp_dconn_t *)pmixp_conn_get_data(conn);
pmixp_dconn_lock(dconn->nodeid);
pmixp_dconn_disconnect(dconn);
pmixp_dconn_unlock(dconn);
}
/*
* Receive the first message identifying initiator
*/
static void
_direct_conn_establish(pmixp_conn_t *conn, void *_hdr, void *msg)
{
pmixp_io_engine_t *eng = pmixp_conn_get_eng(conn);
pmixp_base_hdr_t *hdr = (pmixp_base_hdr_t *)_hdr;
pmixp_dconn_t *dconn = NULL;
pmixp_conn_t *new_conn;
eio_obj_t *obj;
int fd = pmixp_io_detach(eng);
char *ep_data = NULL;
uint32_t ep_len = 0;
buf_t *buf_msg;
int rc;
char *nodename = NULL;
uid_t uid = SLURM_AUTH_NOBODY;
if (!hdr->ext_flag) {
nodename = pmixp_info_job_host(hdr->nodeid);
PMIXP_ERROR("Connection failed from %u(%s)",
hdr->nodeid, nodename);
xfree(nodename);
close(fd);
return;
}
buf_msg = create_buf(msg, hdr->msgsize);
/* Retrieve endpoint information */
rc = _base_hdr_unpack_ext(buf_msg, &ep_data, &ep_len);
if (rc) {
FREE_NULL_BUFFER(buf_msg);
close(fd);
nodename = pmixp_info_job_host(hdr->nodeid);
PMIXP_ERROR("Failed to unpack the direct connection message from %u(%s)",
hdr->nodeid, nodename);
xfree(nodename);
return;
}
/* Unpack and verify the auth credential */
rc = _auth_cred_verify(buf_msg, &uid);
FREE_NULL_BUFFER(buf_msg);
if (rc) {
close(fd);
nodename = pmixp_info_job_host(hdr->nodeid);
PMIXP_ERROR("Connection reject from %u(%s)",
hdr->nodeid, nodename);
xfree(nodename);
return;
}
dconn = pmixp_dconn_accept(hdr->nodeid, fd);
if (!dconn) {
/* connection was refused because we already
* have established connection
* It seems that some sort of race condition occurred
*/
close(fd);
nodename = pmixp_info_job_host(hdr->nodeid);
PMIXP_ERROR("Failed to accept direct connection from %u(%s)",
hdr->nodeid, nodename);
xfree(nodename);
return;
}
dconn->uid = uid;
new_conn = pmixp_conn_new_persist(PMIXP_PROTO_DIRECT,
pmixp_dconn_engine(dconn),
_direct_new_msg_conn,
_direct_return_connection, dconn);
pmixp_dconn_unlock(dconn);
obj = eio_obj_create(fd, &direct_peer_ops, (void *)new_conn);
eio_new_obj(pmixp_info_io(), obj);
/* wakeup this connection to get processed */
eio_signal_wakeup(pmixp_info_io());
}
void pmixp_server_direct_conn(int fd)
{
eio_obj_t *obj;
pmixp_conn_t *conn;
PMIXP_DEBUG("Request from fd = %d", fd);
/* Set nonblocking */
fd_set_nonblocking(fd);
pmixp_fd_set_nodelay(fd);
conn = pmixp_conn_new_temp(PMIXP_PROTO_DIRECT, fd,
_direct_conn_establish);
/* try to process right here */
pmixp_conn_progress_rcv(conn);
if (!pmixp_conn_is_alive(conn)) {
/* success, don't need this connection anymore */
pmixp_conn_return(conn);
return;
}
/* If it is a blocking operation: create AIO object to
* handle it */
obj = eio_obj_create(fd, &direct_peer_ops, (void *)conn);
eio_new_obj(pmixp_info_io(), obj);
/* wakeup this connection to get processed */
eio_signal_wakeup(pmixp_info_io());
}
static void _direct_send(pmixp_dconn_t *dconn, pmixp_ep_t *ep,
pmixp_base_hdr_t bhdr, buf_t *buf,
pmixp_server_sent_cb_t complete_cb, void *cb_data)
{
char nhdr[PMIXP_BASE_HDR_SIZE];
size_t dsize = 0, hsize = 0;
int rc;
hsize = _direct_hdr_pack(&bhdr, nhdr);
xassert(PMIXP_EP_NOIDEID == ep->type);
/* TODO: I think we can avoid locking */
_direct_proto_message_t *msg = xmalloc(sizeof(*msg));
msg->sent_cb = complete_cb;
msg->cbdata = cb_data;
msg->hdr = bhdr;
msg->buffer = _buf_finalize(buf, nhdr, hsize, &dsize);
msg->buf_ptr = buf;
rc = pmixp_dconn_send(dconn, msg);
if (SLURM_SUCCESS != rc) {
msg->sent_cb(rc, PMIXP_P2P_INLINE, msg->cbdata);
xfree( msg );
}
}
int pmixp_server_direct_conn_early(void)
{
pmixp_coll_type_t types[] = { PMIXP_COLL_TYPE_FENCE_TREE, PMIXP_COLL_TYPE_FENCE_RING };
pmixp_coll_type_t type = pmixp_info_srv_fence_coll_type();
pmixp_coll_t *coll[PMIXP_COLL_TYPE_FENCE_MAX] = { NULL };
int i, rc, count = 0;
pmix_proc_t proc;
PMIXP_DEBUG("called");
proc.rank = pmixp_lib_get_wildcard();
strlcpy(proc.nspace, _pmixp_job_info.nspace, sizeof(proc.nspace));
for (i=0; i < sizeof(types)/sizeof(types[0]); i++){
if (type != PMIXP_COLL_TYPE_FENCE_MAX && type != types[i]) {
continue;
}
coll[count++] = pmixp_state_coll_get(types[i], &proc, 1);
}
/* use Tree algo by default */
if (!count) {
coll[count++] = pmixp_state_coll_get(PMIXP_COLL_TYPE_FENCE_TREE, &proc, 1);
}
for (i = 0; i < count; i++) {
if (coll[i]) {
pmixp_ep_t ep = {0};
buf_t *buf;
ep.type = PMIXP_EP_NOIDEID;
switch (coll[i]->type) {
case PMIXP_COLL_TYPE_FENCE_TREE:
ep.ep.nodeid = coll[i]->state.tree.prnt_peerid;
if (ep.ep.nodeid < 0) {
/* this is the root node, it has no
* the parent node to early connect */
continue;
}
break;
case PMIXP_COLL_TYPE_FENCE_RING:
/* calculate the id of the next ring neighbor */
ep.ep.nodeid = (coll[i]->my_peerid + 1) %
coll[i]->peers_cnt;
break;
default:
PMIXP_ERROR("Unknown coll type");
return SLURM_ERROR;
}
buf = pmixp_server_buf_new();
rc = pmixp_server_send_nb(
&ep, PMIXP_MSG_INIT_DIRECT, coll[i]->seq,
buf, pmixp_server_sent_buf_cb, buf);
if (SLURM_SUCCESS != rc) {
PMIXP_ERROR_STD("send init msg error");
return SLURM_ERROR;
}
}
}
return SLURM_SUCCESS;
}
/*
* ------------------- Slurm communication protocol -----------------------
*/
/*
* See process_handler_t prototype description
* on the details of this function output values
*/
static void _slurm_new_msg(pmixp_conn_t *conn,
void *_hdr, void *msg)
{
pmixp_slurm_rhdr_t *hdr = (pmixp_slurm_rhdr_t *)_hdr;
buf_t *buf_msg = create_buf(msg, hdr->shdr.msgsize);
if (hdr->shdr.ext_flag ) {
/* Extra information was incorporated into this message.
* This should be an endpoint data
*/
_process_extended_hdr(&hdr->shdr, buf_msg);
}
_process_server_request(&hdr->shdr, buf_msg);
}
/*
* TODO: we need to keep track of the "me"
* structures created here, because we need to
* free them in "pmixp_stepd_finalize"
*/
void pmixp_server_slurm_conn(int fd)
{
eio_obj_t *obj;
pmixp_conn_t *conn = NULL;
PMIXP_DEBUG("Request from fd = %d", fd);
pmixp_debug_hang(0);
/* Set nonblocking */
fd_set_nonblocking(fd);
conn = pmixp_conn_new_temp(PMIXP_PROTO_SLURM, fd, _slurm_new_msg);
/* try to process right here */
pmixp_conn_progress_rcv(conn);
if (!pmixp_conn_is_alive(conn)) {
/* success, don't need this connection anymore */
pmixp_conn_return(conn);
return;
}
/* If it is a blocking operation: create AIO object to
* handle it */
obj = eio_obj_create(fd, &slurm_peer_ops, (void *)conn);
eio_new_obj(pmixp_info_io(), obj);
}
/*
* Server message processing
*/
static uint32_t _slurm_proto_msize(void *buf)
{
pmixp_slurm_rhdr_t *ptr = (pmixp_slurm_rhdr_t *)buf;
pmixp_base_hdr_t *hdr = &ptr->shdr;
xassert(ptr->size == hdr->msgsize + PMIXP_BASE_HDR_SIZE);
xassert(hdr->magic == PMIXP_SERVER_MSG_MAGIC);
return hdr->msgsize;
}
/*
* Pack message header.
* Returns packed size
* Note: asymmetric to _recv_unpack_hdr because of additional Slurm header
*/
static int _slurm_pack_hdr(pmixp_base_hdr_t *hdr, void *net)
{
buf_t *buf = create_buf(net, PMIXP_BASE_HDR_MAX);
int size = 0;
_base_hdr_pack_full(buf, hdr);
size = get_buf_offset(buf);
/* free packbuf, but not the memory it points to */
buf->head = NULL;
FREE_NULL_BUFFER(buf);
return size;
}
/*
* Unpack message header.
* Returns 0 on success and -errno on failure
* Note: asymmetric to _send_pack_hdr because of additional Slurm header
*/
static int _slurm_proto_unpack_hdr(void *net, void *host)
{
pmixp_slurm_rhdr_t *rhdr = (pmixp_slurm_rhdr_t *)host;
buf_t *packbuf = create_buf(net, PMIXP_SAPI_RECV_HDR_SIZE);
if (_sapi_rhdr_unpack_fixed(packbuf, rhdr) ) {
return -EINVAL;
}
/* free packbuf, but not the memory it points to */
packbuf->head = NULL;
FREE_NULL_BUFFER(packbuf);
return 0;
}
static int _slurm_send(pmixp_ep_t *ep, pmixp_base_hdr_t bhdr, buf_t *buf)
{
const char *addr = NULL, *data = NULL, *hostlist = NULL;
char nhdr[PMIXP_BASE_HDR_MAX];
size_t hsize = 0, dsize = 0;
int rc;
/* setup the header */
addr = pmixp_info_srv_usock_path();
bhdr.ext_flag = 0;
if (pmixp_info_srv_direct_conn() && PMIXP_EP_NOIDEID == ep->type) {
bhdr.ext_flag = 1;
}
hsize = _slurm_pack_hdr(&bhdr, nhdr);
data = _buf_finalize(buf, nhdr, hsize, &dsize);
switch( ep->type ){
case PMIXP_EP_HLIST:
hostlist = ep->ep.hostlist;
rc = pmixp_stepd_send(ep->ep.hostlist, addr,
data, dsize, 500, 7, 0);
break;
case PMIXP_EP_NOIDEID: {
char *nodename = pmixp_info_job_host(ep->ep.nodeid);
char *address = slurm_conf_expand_slurmd_path(addr,
nodename,
nodename);
rc = pmixp_p2p_send(nodename, address, data, dsize,
500, 7, 0);
xfree(address);
xfree(nodename);
break;
}
default:
PMIXP_ERROR("Bad value of the EP type: %d", (int)ep->type);
abort();
}
if (SLURM_SUCCESS != rc) {
PMIXP_ERROR("Cannot send message to %s, size = %u, "
"hostlist:\n%s",
addr, (uint32_t) dsize, hostlist);
}
return rc;
}
/*
* ------------------- communication DEBUG tools -----------------------
*/
#ifndef NDEBUG
/*
* This is solely a debug code that helps to estimate
* the performance of the communication subsystem
* of the plugin
*/
static pthread_mutex_t _pmixp_pp_lock;
#define PMIXP_PP_PWR2_MIN 0
#define PMIXP_PP_PWR2_MAX 24
static bool _pmixp_pp_on = false;
static bool _pmixp_pp_same_thr = false;
static int _pmixp_pp_low = PMIXP_PP_PWR2_MIN;
static int _pmixp_pp_up = PMIXP_PP_PWR2_MAX;
static int _pmixp_pp_bound = 10;
static int _pmixp_pp_siter = 1000;
static int _pmixp_pp_liter = 100;
static volatile int _pmixp_pp_count = 0;
#include <time.h>
#define GET_TS() ({ \
struct timespec ts; \
double ret = 0; \
clock_gettime(CLOCK_MONOTONIC, &ts); \
ret = ts.tv_sec + 1E-9*ts.tv_nsec; \
ret; \
})
static volatile int _pmixp_pp_warmup = 0;
static volatile int _pmixp_pp_iters = 0;
static double _pmixp_pp_start = 0;
int pmixp_server_pp_count(void)
{
return _pmixp_pp_count;
}
int pmixp_server_pp_warmups(void)
{
return _pmixp_pp_warmup;
}
void pmixp_server_pp_inc(void)
{
_pmixp_pp_count++;
}
int pmixp_server_pp_same_thread(void)
{
return _pmixp_pp_same_thr;
}
void pmixp_server_pp_start(void)
{
_pmixp_pp_start = GET_TS();
}
bool pmixp_server_pp_check_fini(int size)
{
if ( (pmixp_server_pp_count() + 1) >=
(_pmixp_pp_warmup + _pmixp_pp_iters)){
slurm_mutex_lock(&_pmixp_pp_lock);
PMIXP_ERROR("latency: %d - %.9lf", size,
(GET_TS() - _pmixp_pp_start) / _pmixp_pp_iters );
slurm_mutex_unlock(&_pmixp_pp_lock);
return true;
}
return false;
}
static bool _consists_from_digits(char *s)
{
if (strspn(s, "0123456789") == strlen(s)){
return true;
}
return false;
}
void pmixp_server_init_pp(char ***env)
{
char *env_ptr = NULL;
int tmp_int;
slurm_mutex_init(&_pmixp_pp_lock);
/* check if we want to run ping-pong */
if (!(env_ptr = getenvp(*env, PMIXP_PP_ON))) {
return;
}
if (!xstrcmp("1", env_ptr) || !xstrcmp("true", env_ptr)) {
_pmixp_pp_on = true;
}
if ((env_ptr = getenvp(*env, PMIXP_PP_SAMETHR))) {
if (!xstrcmp("1", env_ptr) || !xstrcmp("true", env_ptr)) {
_pmixp_pp_same_thr = true;
}
}
if ((env_ptr = getenvp(*env, PMIXP_PP_LOW))) {
if (_consists_from_digits(env_ptr)) {
tmp_int = atoi(env_ptr);
_pmixp_pp_low = tmp_int < PMIXP_PP_PWR2_MAX ?
tmp_int : PMIXP_PP_PWR2_MAX;
}
}
if ((env_ptr = getenvp(*env, PMIXP_PP_UP))) {
if (_consists_from_digits(env_ptr)) {
tmp_int = atoi(env_ptr);
_pmixp_pp_up = tmp_int < PMIXP_PP_PWR2_MAX ?
tmp_int : PMIXP_PP_PWR2_MAX;
}
}
if ((env_ptr = getenvp(*env, PMIXP_PP_SITER))) {
if (_consists_from_digits(env_ptr)) {
_pmixp_pp_siter = atoi(env_ptr);
}
}
if ((env_ptr = getenvp(*env, PMIXP_PP_LITER))) {
if (_consists_from_digits(env_ptr)) {
_pmixp_pp_liter = atoi(env_ptr);
}
}
if ((env_ptr = getenvp(*env, PMIXP_PP_BOUND))) {
if (_consists_from_digits(env_ptr)) {
_pmixp_pp_bound = atoi(env_ptr);
}
}
}
bool pmixp_server_want_pp(void)
{
return _pmixp_pp_on;
}
/*
* For this to work the following conditions supposed to be
* satisfied:
* - Slurm has to be configured with `--enable-debug` option
* - jobstep needs to have at least two nodes
* In this case communication exchange will be done between
* the first two nodes.
*/
void pmixp_server_run_pp(void)
{
int i;
size_t start, end, bound;
/* ping is initiated by the nodeid == 0
* all the rest - just exit
*/
if (pmixp_info_nodeid()) {
return;
}
start = 1 << _pmixp_pp_low;
end = 1 << _pmixp_pp_up;
bound = 1 << _pmixp_pp_bound;
for (i = start; i <= end; i *= 2) {
int count, iters = _pmixp_pp_siter;
struct timeval tv1, tv2;
double time;
if (i >= bound) {
iters = _pmixp_pp_liter;
}
if (!_pmixp_pp_same_thr) {
/* warmup - 10% of iters # */
count = pmixp_server_pp_count() + iters/10;
while (pmixp_server_pp_count() < count) {
int cur_count = pmixp_server_pp_count();
pmixp_server_pp_send(1, i);
while (cur_count == pmixp_server_pp_count()) {
usleep(1);
}
}
count = pmixp_server_pp_count() + iters;
gettimeofday(&tv1, NULL);
while (pmixp_server_pp_count() < count) {
int cur_count = pmixp_server_pp_count();
/* Send the message to the (nodeid == 1) */
pmixp_server_pp_send(1, i);
/* wait for the response */
while (cur_count == pmixp_server_pp_count());
}
gettimeofday(&tv2, NULL);
time = tv2.tv_sec + 1E-6 * tv2.tv_usec -
(tv1.tv_sec + 1E-6 * tv1.tv_usec);
/* Output measurements to the slurmd.log */
PMIXP_ERROR("latency: %d - %.9lf", i, time / iters );
} else {
int count = iters + iters/10;
slurm_mutex_lock(&_pmixp_pp_lock);
_pmixp_pp_warmup = iters/10;
_pmixp_pp_iters = iters;
_pmixp_pp_count = 0;
slurm_mutex_unlock(&_pmixp_pp_lock);
/* initiate sends */
pmixp_server_pp_send(1, i);
while (pmixp_server_pp_count() < count){
sched_yield();
}
}
}
}
struct pp_cbdata
{
buf_t *buf;
double start;
int size;
};
void pingpong_complete(int rc, pmixp_p2p_ctx_t ctx, void *data)
{
struct pp_cbdata *d = (struct pp_cbdata*)data;
FREE_NULL_BUFFER(d->buf);
xfree(data);
// PMIXP_ERROR("Send complete: %d %lf", d->size, GET_TS - d->start);
}
int pmixp_server_pp_send(int nodeid, int size)
{
buf_t *buf = pmixp_server_buf_new();
int rc;
pmixp_ep_t ep;
struct pp_cbdata *cbdata = xmalloc(sizeof(*cbdata));
grow_buf(buf, size);
ep.type = PMIXP_EP_NOIDEID;
ep.ep.nodeid = nodeid;
cbdata->buf = buf;
cbdata->size = size;
set_buf_offset(buf,get_buf_offset(buf) + size);
rc = pmixp_server_send_nb(&ep, PMIXP_MSG_PINGPONG,
_pmixp_pp_count, buf, pingpong_complete,
(void*)cbdata);
if (SLURM_SUCCESS != rc) {
char *nodename = pmixp_info_job_host(nodeid);
PMIXP_ERROR("Was unable to wait for the parent %s to "
"become alive",
nodename);
xfree(nodename);
}
return rc;
}
static pthread_mutex_t _pmixp_pp_lock;
#define PMIXP_CPERF_PWR2_MIN 0
#define PMIXP_CPERF_PWR2_MAX 20
static bool _pmixp_cperf_on = false;
static int _pmixp_cperf_low = PMIXP_CPERF_PWR2_MIN;
static int _pmixp_cperf_up = PMIXP_CPERF_PWR2_MAX;
static int _pmixp_cperf_bound = 10;
static int _pmixp_cperf_siter = 1000;
static int _pmixp_cperf_liter = 100;
static volatile int _pmixp_cperf_count = 0;
static int _pmixp_server_cperf_count()
{
return _pmixp_cperf_count;
}
static void _pmixp_server_cperf_inc()
{
_pmixp_cperf_count++;
}
void pmixp_server_init_cperf(char ***env)
{
char *env_ptr = NULL;
int tmp_int;
slurm_mutex_init(&_pmixp_pp_lock);
/* check if we want to run ping-pong */
if (!(env_ptr = getenvp(*env, PMIXP_CPERF_ON))) {
return;
}
if (!strcmp("1", env_ptr) || !strcmp("true", env_ptr)) {
_pmixp_cperf_on = true;
}
if ((env_ptr = getenvp(*env, PMIXP_CPERF_LOW))) {
if (_consists_from_digits(env_ptr)) {
tmp_int = atoi(env_ptr);
_pmixp_cperf_low = tmp_int < PMIXP_CPERF_PWR2_MAX ?
tmp_int : PMIXP_CPERF_PWR2_MAX;
}
}
if ((env_ptr = getenvp(*env, PMIXP_CPERF_UP))) {
if (_consists_from_digits(env_ptr)) {
tmp_int = atoi(env_ptr);
_pmixp_cperf_up = tmp_int < PMIXP_CPERF_PWR2_MAX ?
tmp_int : PMIXP_CPERF_PWR2_MAX;
}
}
if ((env_ptr = getenvp(*env, PMIXP_CPERF_SITER))) {
if (_consists_from_digits(env_ptr)) {
_pmixp_cperf_siter = atoi(env_ptr);
}
}
if ((env_ptr = getenvp(*env, PMIXP_CPERF_LITER))) {
if (_consists_from_digits(env_ptr)) {
_pmixp_cperf_liter = atoi(env_ptr);
}
}
if ((env_ptr = getenvp(*env, PMIXP_CPERF_BOUND))) {
if (_consists_from_digits(env_ptr)) {
_pmixp_cperf_bound = atoi(env_ptr);
}
}
}
bool pmixp_server_want_cperf(void)
{
return _pmixp_cperf_on;
}
inline static void _pmixp_cperf_cbfunc(pmixp_coll_t *coll,
void *r_fn, void *r_cbdata)
{
/*
* we will be called with mutex locked.
* need to unlock it so that callback won't
* deadlock
*/
slurm_mutex_unlock(&coll->lock);
/* invoke the callback */
pmixp_lib_release_invoke(r_fn, r_cbdata);
/* lock it back before proceed */
slurm_mutex_lock(&coll->lock);
/* go to the next iteration */
_pmixp_server_cperf_inc();
}
static void _pmixp_cperf_tree_cbfunc(int status, const char *data,
size_t ndata, void *cbdata,
void *r_fn, void *r_cbdata)
{
/* small violation - we kinow what is the type of release
* data and will use that knowledge to avoid the deadlock
*/
pmixp_coll_t *coll = pmixp_coll_tree_from_cbdata(r_cbdata);
xassert(SLURM_SUCCESS == status);
_pmixp_cperf_cbfunc(coll, r_fn, r_cbdata);
}
static void _pmixp_cperf_ring_cbfunc(int status, const char *data,
size_t ndata, void *cbdata,
void *r_fn, void *r_cbdata)
{
/* small violation - we kinow what is the type of release
* data and will use that knowledge to avoid the deadlock
*/
pmixp_coll_t *coll = pmixp_coll_ring_from_cbdata(r_cbdata);
xassert(SLURM_SUCCESS == status);
_pmixp_cperf_cbfunc(coll, r_fn, r_cbdata);
}
typedef void (*pmixp_cperf_cbfunc_fn_t)(int status, const char *data,
size_t ndata, void *cbdata,
void *r_fn, void *r_cbdata);
static int _pmixp_server_cperf_iter(pmixp_coll_type_t type, char *data, int ndata)
{
pmixp_coll_t *coll;
pmix_proc_t procs;
int cur_count = _pmixp_server_cperf_count();
pmixp_cperf_cbfunc_fn_t cperf_cbfunc = _pmixp_cperf_tree_cbfunc;
strlcpy(procs.nspace, pmixp_info_namespace(), sizeof(procs.nspace));
procs.rank = pmixp_lib_get_wildcard();
switch (type) {
case PMIXP_COLL_TYPE_FENCE_RING:
cperf_cbfunc = _pmixp_cperf_ring_cbfunc;
break;
case PMIXP_COLL_TYPE_FENCE_TREE:
cperf_cbfunc = _pmixp_cperf_tree_cbfunc;
break;
default:
PMIXP_ERROR("Uncnown coll type");
return SLURM_ERROR;
}
coll = pmixp_state_coll_get(type, &procs, 1);
pmixp_coll_sanity_check(coll);
xassert(!pmixp_coll_contrib_local(coll, type, data, ndata,
cperf_cbfunc, NULL));
while (cur_count == _pmixp_server_cperf_count()) {
usleep(1);
}
return SLURM_SUCCESS;
}
/*
* For this to work the following conditions supposed to be
* satisfied:
* - Slurm has to be configured with `--enable-debug` option
* - jobstep needs to have at least two nodes
* In this case communication exchange will be done between
* the first two nodes.
*/
void pmixp_server_run_cperf(void)
{
int size;
size_t start, end, bound;
pmixp_coll_type_t type;
pmixp_coll_type_t types[] = { PMIXP_COLL_TYPE_FENCE_TREE, PMIXP_COLL_TYPE_FENCE_RING };
pmixp_coll_cperf_mode_t mode = pmixp_info_srv_fence_coll_type();
bool is_barrier = pmixp_info_srv_fence_coll_barrier();
int rc = SLURM_SUCCESS;
pmixp_debug_hang(0);
if (!is_barrier) {
start = 1 << _pmixp_cperf_low;
end = 1 << _pmixp_cperf_up;
bound = 1 << _pmixp_cperf_bound;
} else {
start = 0;
end = 1;
bound = 1;
}
PMIXP_ERROR("coll perf mode=%s", pmixp_coll_cperf_mode2str(mode));
for (size = start; size <= end; size *= 2) {
int j, iters = _pmixp_cperf_siter;
struct timeval tv1, tv2;
if (size >= bound) {
iters = _pmixp_cperf_liter;
}
double times[iters];
char *data = xmalloc(size);
PMIXP_ERROR("coll perf %d", size);
memset(times, 0, (sizeof(double) * iters));
for(j=0; j<iters && !rc; j++){
switch (mode) {
case PMIXP_COLL_CPERF_MIXED:
type = types[j%(sizeof(types)/sizeof(types[0]))];
break;
case PMIXP_COLL_CPERF_RING:
type = PMIXP_COLL_TYPE_FENCE_RING;
break;
case PMIXP_COLL_CPERF_TREE:
type = PMIXP_COLL_TYPE_FENCE_TREE;
break;
default:
type = PMIXP_COLL_TYPE_FENCE_RING;
break;
}
gettimeofday(&tv1, NULL);
rc = _pmixp_server_cperf_iter(type, data, size);
gettimeofday(&tv2, NULL);
times[j] = tv2.tv_sec + 1E-6 * tv2.tv_usec -
(tv1.tv_sec + 1E-6 * tv1.tv_usec);
}
for(j=0; j<iters; j++){
/* Output measurements to the slurmd.log */
PMIXP_ERROR("\t%d %d: %.9lf", j, size, times[j]);
}
xfree(data);
if (is_barrier) {
break;
}
}
}
#endif // NDEBUG