| /*****************************************************************************\ |
| ** pmix_debug.h - PMIx debug primitives |
| ***************************************************************************** |
| * Copyright (C) 2014-2015 Artem Polyakov. All rights reserved. |
| * Copyright (C) 2015-2017 Mellanox Technologies. All rights reserved. |
| * Written by Artem Polyakov <artpol84@gmail.com, artemp@mellanox.com>. |
| * |
| * This file is part of Slurm, a resource management program. |
| * For details, see <https://slurm.schedmd.com/>. |
| * Please also read the included file: DISCLAIMER. |
| * |
| * Slurm is free software; you can redistribute it and/or modify it under |
| * the terms of the GNU General Public License as published by the Free |
| * Software Foundation; either version 2 of the License, or (at your option) |
| * any later version. |
| * |
| * In addition, as a special exception, the copyright holders give permission |
| * to link the code of portions of this program with the OpenSSL library under |
| * certain conditions as described in each individual source file, and |
| * distribute linked combinations including the two. You must obey the GNU |
| * General Public License in all respects for all of the code used other than |
| * OpenSSL. If you modify file(s) with this exception, you may extend this |
| * exception to your version of the file(s), but you are not obligated to do |
| * so. If you do not wish to do so, delete this exception statement from your |
| * version. If you delete this exception statement from all source files in |
| * the program, then also delete it here. |
| * |
| * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY |
| * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
| * details. |
| * |
| * You should have received a copy of the GNU General Public License along |
| * with Slurm; if not, write to the Free Software Foundation, Inc., |
| * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| \*****************************************************************************/ |
| |
| #include "pmixp_common.h" |
| #include "pmixp_dmdx.h" |
| #include "pmixp_server.h" |
| #include "pmixp_client.h" |
| |
| /* set default direct modex timeout to 10 sec */ |
| #define DMDX_DEFAULT_TIMEOUT 10 |
| |
| typedef enum { |
| DMDX_REQUEST = 1, |
| DMDX_RESPONSE |
| } dmdx_type_t; |
| |
| typedef struct { |
| uint32_t seq_num; |
| time_t ts; |
| #ifndef NDEBUG |
| /* we need this only for verification */ |
| pmix_nspace_t nspace; |
| int rank; |
| #endif |
| void *cbfunc; |
| void *cbdata; |
| } dmdx_req_info_t; |
| |
| typedef struct { |
| uint32_t seq_num; |
| pmix_proc_t proc; |
| char *sender_ns; |
| int sender_nodeid; |
| int rank; |
| } dmdx_caddy_t; |
| |
| void _dmdx_free_caddy(dmdx_caddy_t *caddy) |
| { |
| if (NULL == caddy) { |
| /* nothing to do */ |
| return; |
| } |
| if (NULL != caddy->sender_ns) { |
| xfree(caddy->sender_ns); |
| } |
| xfree(caddy); |
| } |
| |
| static list_t *_dmdx_requests; |
| static uint32_t _dmdx_seq_num = 1; |
| |
| static void _respond_with_error(int seq_num, int nodeid, |
| char *sender_ns, int status); |
| |
| int pmixp_dmdx_init(void) |
| { |
| _dmdx_requests = list_create(xfree_ptr); |
| _dmdx_seq_num = 1; |
| return SLURM_SUCCESS; |
| } |
| |
| int pmixp_dmdx_finalize(void) |
| { |
| FREE_NULL_LIST(_dmdx_requests); |
| return 0; |
| } |
| |
| |
| static void _setup_header(buf_t *buf, dmdx_type_t t, |
| const char *nspace, int rank, int status) |
| { |
| char *str; |
| /* 1. pack message type */ |
| unsigned char type = (char)t; |
| pack8(type, buf); |
| |
| /* 2. pack namespace _with_ '\0' (strlen(nspace) + 1)! */ |
| packmem((char *)nspace, strlen(nspace) + 1, buf); |
| |
| /* 3. pack rank */ |
| pack32((uint32_t)rank, buf); |
| |
| /* 4. pack my rendezvous point - local namespace |
| * ! _with_ '\0' (strlen(nspace) + 1) ! */ |
| str = pmixp_info_namespace(); |
| packmem(str, strlen(str) + 1, buf); |
| |
| /* 5. pack the status */ |
| pack32((uint32_t)status, buf); |
| } |
| |
| static int _read_type(buf_t *buf, dmdx_type_t *type) |
| { |
| unsigned char t; |
| int rc; |
| /* 1. unpack message type */ |
| if (SLURM_SUCCESS != (rc = unpack8(&t, buf))) { |
| PMIXP_ERROR("Cannot unpack message type!"); |
| return SLURM_ERROR; |
| } |
| *type = (dmdx_type_t)t; |
| return SLURM_SUCCESS; |
| } |
| |
| static int _read_info(buf_t *buf, char **ns, int *rank, |
| char **sender_ns, int *status) |
| { |
| uint32_t cnt, uint32_tmp; |
| int rc; |
| *ns = NULL; |
| *sender_ns = NULL; |
| |
| /* 1. unpack namespace */ |
| if (SLURM_SUCCESS != (rc = unpackmem_ptr(ns, &cnt, buf))) { |
| PMIXP_ERROR("Cannot unpack requested namespace!"); |
| return rc; |
| } |
| /* We supposed to unpack a whole null-terminated string (with '\0')! |
| * (*ns)[cnt] = '\0'; |
| */ |
| |
| /* 2. unpack rank */ |
| if (SLURM_SUCCESS != (rc = unpack32(&uint32_tmp, buf))) { |
| PMIXP_ERROR("Cannot unpack requested rank!"); |
| return rc; |
| } |
| *rank = uint32_tmp; |
| |
| if (SLURM_SUCCESS != (rc = unpackmem_ptr(sender_ns, &cnt, buf))) { |
| PMIXP_ERROR("Cannot unpack sender namespace!"); |
| return rc; |
| } |
| /* We supposed to unpack a whole null-terminated string (with '\0')! |
| * (*sender_ns)[cnt] = '\0'; |
| */ |
| |
| /* 4. unpack status */ |
| if (SLURM_SUCCESS != (rc = unpack32(&uint32_tmp, buf))) { |
| PMIXP_ERROR("Cannot unpack rank!"); |
| return rc; |
| } |
| *status = uint32_tmp; |
| return SLURM_SUCCESS; |
| } |
| |
| static void _respond_with_error(int seq_num, int nodeid, |
| char *sender_ns, int status) |
| { |
| buf_t *buf = create_buf(NULL, 0); |
| pmixp_ep_t ep; |
| int rc; |
| |
| ep.type = PMIXP_EP_NOIDEID; |
| ep.ep.nodeid = nodeid; |
| |
| /* rank doesn't matter here, don't send it */ |
| _setup_header(buf, DMDX_RESPONSE, sender_ns, -1, status); |
| |
| /* send response */ |
| rc = pmixp_server_send_nb(&ep, PMIXP_MSG_DMDX, seq_num, buf, |
| pmixp_server_sent_buf_cb, buf); |
| if (SLURM_SUCCESS != rc) { |
| char *nodename = pmixp_info_job_host(nodeid); |
| PMIXP_ERROR("Cannot send direct modex error response to %s", |
| nodename); |
| xfree(nodename); |
| } |
| } |
| |
| static void _dmdx_pmix_cb(int status, char *data, size_t sz, |
| void *cbdata) |
| { |
| dmdx_caddy_t *caddy = (dmdx_caddy_t *)cbdata; |
| buf_t *buf = pmixp_server_buf_new(); |
| pmixp_ep_t ep; |
| int rc; |
| |
| /* setup response header */ |
| _setup_header(buf, DMDX_RESPONSE, caddy->proc.nspace, caddy->proc.rank, |
| status); |
| |
| /* pack the response */ |
| packmem(data, sz, buf); |
| |
| /* send the request */ |
| ep.type = PMIXP_EP_NOIDEID; |
| ep.ep.nodeid = caddy->sender_nodeid; |
| rc = pmixp_server_send_nb(&ep, PMIXP_MSG_DMDX, caddy->seq_num, buf, |
| pmixp_server_sent_buf_cb, buf); |
| if (SLURM_SUCCESS != rc) { |
| char *nodename = pmixp_info_job_host(caddy->sender_nodeid); |
| /* not much we can do here. Caller will react by timeout */ |
| PMIXP_ERROR("Cannot send direct modex response to %s", |
| nodename); |
| } |
| _dmdx_free_caddy(caddy); |
| } |
| |
| int pmixp_dmdx_get(const pmix_nspace_t nspace, int rank, |
| void *cbfunc, void *cbdata) |
| { |
| dmdx_req_info_t *req; |
| buf_t *buf; |
| int rc; |
| uint32_t seq; |
| pmixp_ep_t ep; |
| |
| /* need to send the request */ |
| ep.type = PMIXP_EP_NOIDEID; |
| ep.ep.nodeid = pmixp_nspace_resolve(nspace, rank); |
| |
| buf = pmixp_server_buf_new(); |
| /* setup message header */ |
| _setup_header(buf, DMDX_REQUEST, nspace, rank, SLURM_SUCCESS); |
| |
| /* store cur seq. num and move to the next request */ |
| seq = _dmdx_seq_num++; |
| |
| /* track this request */ |
| req = xmalloc(sizeof(dmdx_req_info_t)); |
| req->seq_num = seq; |
| req->cbfunc = cbfunc; |
| req->cbdata = cbdata; |
| req->ts = time(NULL); |
| #ifndef NDEBUG |
| strlcpy(req->nspace, nspace, sizeof(req->nspace)); |
| req->rank = rank; |
| #endif |
| list_append(_dmdx_requests, req); |
| |
| /* send the request */ |
| rc = pmixp_server_send_nb(&ep, PMIXP_MSG_DMDX, seq, buf, |
| pmixp_server_sent_buf_cb, buf); |
| |
| /* check the return status */ |
| if (SLURM_SUCCESS != rc) { |
| char *nodename = pmixp_info_job_host(ep.ep.nodeid); |
| PMIXP_ERROR("Cannot send direct modex request to %s, size %d", |
| nodename, get_buf_offset(buf)); |
| xfree(nodename); |
| pmixp_lib_modex_invoke(cbfunc, SLURM_ERROR, NULL, 0, |
| cbdata, NULL, NULL); |
| rc = SLURM_ERROR; |
| } |
| |
| return rc; |
| } |
| |
| static void _dmdx_req(buf_t *buf, int nodeid, uint32_t seq_num) |
| { |
| int rank, rc; |
| int status; |
| char *ns = NULL, *sender_ns = NULL; |
| pmixp_namespace_t *nsptr; |
| dmdx_caddy_t *caddy = NULL; |
| |
| rc = _read_info(buf, &ns, &rank, &sender_ns,&status); |
| if (SLURM_SUCCESS != rc) { |
| char *nodename = pmixp_info_job_host(nodeid); |
| /* there is not much we can do here, but data corruption |
| * shouldn't happen */ |
| PMIXP_ERROR("Fail to unpack header data in request from %s, rc = %d", |
| nodename, rc); |
| xfree(nodename); |
| goto exit; |
| } |
| |
| if (0 != xstrcmp(ns, pmixp_info_namespace())) { |
| /* request for namespase that is not controlled by this daemon |
| * considered as error. This may change in future. */ |
| char *nodename = pmixp_info_job_host(nodeid); |
| PMIXP_ERROR("Bad request from %s: asked for nspace = %s, mine is %s", |
| nodename, ns, pmixp_info_namespace()); |
| _respond_with_error(seq_num, nodeid, sender_ns, |
| PMIX_ERR_INVALID_NAMESPACE); |
| xfree(nodename); |
| goto exit; |
| } |
| |
| nsptr = pmixp_nspaces_local(); |
| if (nsptr->ntasks <= rank) { |
| char *nodename = pmixp_info_job_host(nodeid); |
| PMIXP_ERROR("Bad request from %s: nspace \"%s\" has only %d ranks, asked for %d", |
| nodename, ns, nsptr->ntasks, rank); |
| _respond_with_error(seq_num, nodeid, sender_ns, |
| PMIX_ERR_BAD_PARAM); |
| xfree(nodename); |
| goto exit; |
| } |
| |
| /* setup temp structure to handle information from _dmdx_pmix_cb */ |
| caddy = xmalloc(sizeof(dmdx_caddy_t)); |
| caddy->seq_num = seq_num; |
| |
| /* ns is a pointer inside incoming buffer */ |
| strlcpy(caddy->proc.nspace, ns, sizeof(caddy->proc.nspace)); |
| ns = NULL; /* protect the data */ |
| caddy->proc.rank = rank; |
| |
| /* sender_host was passed from outside - copy it */ |
| caddy->sender_nodeid = nodeid; |
| |
| /* sender_ns is a pointer inside incoming buffer */ |
| caddy->sender_ns = xstrdup(sender_ns); |
| sender_ns = NULL; |
| |
| rc = pmixp_lib_dmodex_request(&caddy->proc, (void *)_dmdx_pmix_cb, |
| (void *)caddy); |
| if (SLURM_SUCCESS != rc) { |
| char *nodename = pmixp_info_job_host(nodeid); |
| PMIXP_ERROR("Can't request modex data from libpmix-server, requesting host = %s, nspace = %s, rank = %d, rc = %d", |
| nodename, caddy->proc.nspace, |
| caddy->proc.rank, rc); |
| _respond_with_error(seq_num, nodeid, caddy->sender_ns, rc); |
| _dmdx_free_caddy(caddy); |
| xfree(nodename); |
| } |
| exit: |
| /* we don't need this buffer anymore */ |
| FREE_NULL_BUFFER(buf); |
| |
| /* no sense to return errors, engine can't do anything |
| * anyway. We've notified libpmix, that's enough */ |
| } |
| |
| static int _dmdx_req_cmp(void *x, void *key) |
| { |
| dmdx_req_info_t *req = (dmdx_req_info_t *)x; |
| uint32_t seq_num = *((uint32_t *)key); |
| return (req->seq_num == seq_num); |
| } |
| |
| static void _dmdx_resp(buf_t *buf, int nodeid, uint32_t seq_num) |
| { |
| dmdx_req_info_t *req; |
| int rank, rc = SLURM_SUCCESS; |
| int status; |
| char *ns = NULL, *sender_ns = NULL; |
| char *data = NULL; |
| uint32_t size = 0; |
| |
| /* find the request tracker */ |
| list_itr_t *it = list_iterator_create(_dmdx_requests); |
| req = (dmdx_req_info_t *)list_find(it, _dmdx_req_cmp, &seq_num); |
| if (NULL == req) { |
| char *nodename = pmixp_info_job_host(nodeid); |
| /* We haven't sent this request! */ |
| PMIXP_ERROR("Received DMDX response with bad seq_num=%d from %s!", |
| seq_num, nodename); |
| list_iterator_destroy(it); |
| rc = SLURM_ERROR; |
| xfree(nodename); |
| goto exit; |
| } |
| |
| /* get the service data */ |
| rc = _read_info(buf, &ns, &rank, &sender_ns, &status); |
| if (SLURM_SUCCESS != rc) { |
| /* notify libpmix about an error */ |
| pmixp_lib_modex_invoke(req->cbfunc, SLURM_ERROR, NULL, 0, |
| req->cbdata, NULL, NULL); |
| goto exit; |
| } |
| |
| /* get the modex blob */ |
| if (SLURM_SUCCESS != (rc = unpackmem_ptr(&data, &size, buf))) { |
| /* notify libpmix about an error */ |
| pmixp_lib_modex_invoke(req->cbfunc, SLURM_ERROR, NULL, 0, |
| req->cbdata, NULL, NULL); |
| goto exit; |
| } |
| |
| /* call back to libpmix-server */ |
| pmixp_lib_modex_invoke(req->cbfunc, status, data, size, |
| req->cbdata, pmixp_free_buf, (void *)buf); |
| |
| /* release tracker & list iterator */ |
| req = NULL; |
| list_delete_item(it); |
| list_iterator_destroy(it); |
| exit: |
| if (SLURM_SUCCESS != rc) { |
| /* we are not expect libpmix to call the callback |
| * to cleanup this buffer */ |
| FREE_NULL_BUFFER(buf); |
| } |
| /* no sense to return errors, engine can't do anything |
| * anyway. We've notified libpmix, that's enough */ |
| } |
| |
| void pmixp_dmdx_process(buf_t *buf, int nodeid, uint32_t seq) |
| { |
| dmdx_type_t type = 0; |
| _read_type(buf, &type); |
| |
| switch (type) { |
| case DMDX_REQUEST: |
| _dmdx_req(buf, nodeid, seq); |
| break; |
| case DMDX_RESPONSE: |
| _dmdx_resp(buf, nodeid, seq); |
| break; |
| default:{ |
| char *nodename = pmixp_info_job_host(nodeid); |
| PMIXP_ERROR("Bad request from host %s. Skip", nodename); |
| xfree(nodename); |
| break; |
| } |
| } |
| } |
| |
| void pmixp_dmdx_timeout_cleanup(void) |
| { |
| list_itr_t *it = list_iterator_create(_dmdx_requests); |
| dmdx_req_info_t *req = NULL; |
| time_t ts = time(NULL); |
| |
| /* run through all requests and discard stale one's */ |
| while ((req = list_next(it))) { |
| if ((ts - req->ts) > pmixp_info_timeout()) { |
| #ifndef NDEBUG |
| /* respond with the timeout to libpmix */ |
| int nodeid = pmixp_nspace_resolve(req->nspace, |
| req->rank); |
| char *nodename = pmixp_info_job_host(nodeid); |
| xassert(NULL != nodename); |
| PMIXP_ERROR("timeout: ns=%s, rank=%d, host=%s, ts=%lu", |
| req->nspace, req->rank, |
| (NULL != nodename) ? nodename : "unknown", |
| ts); |
| if (NULL != nodename) { |
| xfree(nodename); |
| } |
| #endif |
| /* PMIX_ERR_TIMEOUT */ |
| pmixp_lib_modex_invoke(req->cbfunc, SLURM_ERROR, NULL, 0, |
| req->cbdata, NULL, NULL); |
| /* release tracker & list iterator */ |
| list_delete_item(it); |
| } |
| } |
| list_iterator_destroy(it); |
| } |