| /*****************************************************************************\ |
| ** pmix_coll_tree.c - PMIx tree collective primitives |
| ***************************************************************************** |
| * Copyright (C) 2014-2015 Artem Polyakov. All rights reserved. |
| * Copyright (C) 2015-2018 Mellanox Technologies. All rights reserved. |
| * Written by Artem Polyakov <artpol84@gmail.com, artemp@mellanox.com>, |
| * Boris Karasev <karasev.b@gmail.com, boriska@mellanox.com>. |
| * |
| * This file is part of Slurm, a resource management program. |
| * For details, see <https://slurm.schedmd.com/>. |
| * Please also read the included file: DISCLAIMER. |
| * |
| * Slurm is free software; you can redistribute it and/or modify it under |
| * the terms of the GNU General Public License as published by the Free |
| * Software Foundation; either version 2 of the License, or (at your option) |
| * any later version. |
| * |
| * In addition, as a special exception, the copyright holders give permission |
| * to link the code of portions of this program with the OpenSSL library under |
| * certain conditions as described in each individual source file, and |
| * distribute linked combinations including the two. You must obey the GNU |
| * General Public License in all respects for all of the code used other than |
| * OpenSSL. If you modify file(s) with this exception, you may extend this |
| * exception to your version of the file(s), but you are not obligated to do |
| * so. If you do not wish to do so, delete this exception statement from your |
| * version. If you delete this exception statement from all source files in |
| * the program, then also delete it here. |
| * |
| * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY |
| * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
| * details. |
| * |
| * You should have received a copy of the GNU General Public License along |
| * with Slurm; if not, write to the Free Software Foundation, Inc., |
| * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| \*****************************************************************************/ |
| |
| #include "pmixp_common.h" |
| #include "src/common/reverse_tree.h" |
| #include "src/common/slurm_protocol_api.h" |
| #include "pmixp_coll.h" |
| #include "pmixp_nspaces.h" |
| #include "pmixp_server.h" |
| #include "pmixp_client.h" |
| |
| static void _progress_coll_tree(pmixp_coll_t *coll); |
| static void _reset_coll(pmixp_coll_t *coll); |
| |
| static int _pack_coll_info(pmixp_coll_t *coll, buf_t *buf) |
| { |
| pmix_proc_t *procs = coll->pset.procs; |
| size_t nprocs = coll->pset.nprocs; |
| uint32_t size; |
| int i; |
| |
| /* 1. store the type of collective */ |
| size = coll->type; |
| pack32(size, buf); |
| |
| /* 2. Put the number of ranges */ |
| pack32(nprocs, buf); |
| for (i = 0; i < (int)nprocs; i++) { |
| /* Pack namespace */ |
| packmem(procs->nspace, strlen(procs->nspace) + 1, buf); |
| pack32(procs->rank, buf); |
| } |
| |
| return SLURM_SUCCESS; |
| } |
| |
| int pmixp_coll_tree_unpack(buf_t *buf, pmixp_coll_type_t *type, |
| int *nodeid, pmix_proc_t **r, size_t *nr) |
| { |
| pmix_proc_t *procs = NULL; |
| uint32_t nprocs = 0; |
| uint32_t tmp; |
| int i, rc; |
| char *temp_ptr; |
| |
| /* 1. extract the type of collective */ |
| if (SLURM_SUCCESS != (rc = unpack32(&tmp, buf))) { |
| PMIXP_ERROR("Cannot unpack collective type"); |
| return rc; |
| } |
| *type = tmp; |
| |
| /* 2. get the number of ranges */ |
| if (SLURM_SUCCESS != (rc = unpack32(&nprocs, buf))) { |
| PMIXP_ERROR("Cannot unpack collective type"); |
| return rc; |
| } |
| *nr = nprocs; |
| |
| procs = xmalloc(sizeof(pmix_proc_t) * nprocs); |
| *r = procs; |
| |
| for (i = 0; i < (int)nprocs; i++) { |
| /* 3. get namespace/rank of particular process */ |
| if ((rc = unpackmem_ptr(&temp_ptr, &tmp, buf)) || |
| (strlcpy(procs[i].nspace, temp_ptr, |
| sizeof(procs[i].nspace)) > PMIX_MAX_NSLEN)) { |
| PMIXP_ERROR("Cannot unpack namespace for process #%d", |
| i); |
| return rc; |
| } |
| |
| unsigned int tmp; |
| rc = unpack32(&tmp, buf); |
| procs[i].rank = tmp; |
| if (SLURM_SUCCESS != rc) { |
| PMIXP_ERROR("Cannot unpack ranks for process #%d, nsp=%s", |
| i, procs[i].nspace); |
| return rc; |
| } |
| } |
| return SLURM_SUCCESS; |
| } |
| |
| static void _reset_coll_ufwd(pmixp_coll_t *coll) |
| { |
| pmixp_coll_tree_t *tree = &coll->state.tree; |
| |
| /* upward status */ |
| tree->contrib_children = 0; |
| tree->contrib_local = false; |
| memset(tree->contrib_chld, 0, |
| sizeof(tree->contrib_chld[0]) * tree->chldrn_cnt); |
| tree->serv_offs = pmixp_server_buf_reset(tree->ufwd_buf); |
| if (SLURM_SUCCESS != _pack_coll_info(coll, tree->ufwd_buf)) { |
| PMIXP_ERROR("Cannot pack ranges to message header!"); |
| } |
| tree->ufwd_offset = get_buf_offset(tree->ufwd_buf); |
| tree->ufwd_status = PMIXP_COLL_TREE_SND_NONE; |
| } |
| |
| static void _reset_coll_dfwd(pmixp_coll_t *coll) |
| { |
| /* downwards status */ |
| (void)pmixp_server_buf_reset(coll->state.tree.dfwd_buf); |
| if (SLURM_SUCCESS != _pack_coll_info(coll, coll->state.tree.dfwd_buf)) { |
| PMIXP_ERROR("Cannot pack ranges to message header!"); |
| } |
| coll->state.tree.dfwd_cb_cnt = 0; |
| coll->state.tree.dfwd_cb_wait = 0; |
| coll->state.tree.dfwd_status = PMIXP_COLL_TREE_SND_NONE; |
| coll->state.tree.contrib_prnt = false; |
| /* Save the toal service offset */ |
| coll->state.tree.dfwd_offset = get_buf_offset( |
| coll->state.tree.dfwd_buf); |
| } |
| |
| static void _reset_coll(pmixp_coll_t *coll) |
| { |
| pmixp_coll_tree_t *tree = &coll->state.tree; |
| |
| switch (tree->state) { |
| case PMIXP_COLL_TREE_SYNC: |
| /* already reset */ |
| xassert(!tree->contrib_local && !tree->contrib_children && |
| !tree->contrib_prnt); |
| break; |
| case PMIXP_COLL_TREE_COLLECT: |
| case PMIXP_COLL_TREE_UPFWD: |
| case PMIXP_COLL_TREE_UPFWD_WSC: |
| coll->seq++; |
| tree->state = PMIXP_COLL_TREE_SYNC; |
| _reset_coll_ufwd(coll); |
| _reset_coll_dfwd(coll); |
| coll->cbdata = NULL; |
| coll->cbfunc = NULL; |
| break; |
| case PMIXP_COLL_TREE_UPFWD_WPC: |
| /* If we were waiting for the parent contrib, |
| * upward portion is already reset, and may contain |
| * next collective's data */ |
| case PMIXP_COLL_TREE_DOWNFWD: |
| /* same with downward state */ |
| coll->seq++; |
| _reset_coll_dfwd(coll); |
| if (tree->contrib_local || tree->contrib_children) { |
| /* next collective was already started */ |
| tree->state = PMIXP_COLL_TREE_COLLECT; |
| } else { |
| tree->state = PMIXP_COLL_TREE_SYNC; |
| } |
| break; |
| default: |
| PMIXP_ERROR("Bad collective state = %d", (int)tree->state); |
| /* collective is spoiled, reset state */ |
| tree->state = PMIXP_COLL_TREE_SYNC; |
| slurm_kill_job_step(pmixp_info_jobid(), |
| pmixp_info_stepid(), SIGKILL, 0); |
| } |
| } |
| |
| /* |
| * Based on ideas provided by Hongjia Cao <hjcao@nudt.edu.cn> in PMI2 plugin |
| */ |
| int pmixp_coll_tree_init(pmixp_coll_t *coll, hostlist_t **hl) |
| { |
| int max_depth, width, depth, i; |
| char *p; |
| pmixp_coll_tree_t *tree = NULL; |
| |
| tree = &coll->state.tree; |
| tree->state = PMIXP_COLL_TREE_SYNC; |
| |
| width = slurm_conf.tree_width; |
| reverse_tree_info(coll->my_peerid, coll->peers_cnt, width, |
| &tree->prnt_peerid, &tree->chldrn_cnt, &depth, |
| &max_depth); |
| |
| /* We interested in amount of direct children */ |
| tree->contrib_children = 0; |
| tree->contrib_local = false; |
| tree->chldrn_ids = xmalloc(sizeof(int) * width); |
| tree->contrib_chld = xmalloc(sizeof(int) * width); |
| tree->chldrn_cnt = reverse_tree_direct_children(coll->my_peerid, |
| coll->peers_cnt, |
| width, depth, |
| tree->chldrn_ids); |
| if (tree->prnt_peerid == -1) { |
| /* if we are the root of the tree: |
| * - we don't have a parent; |
| * - we have large list of all_childrens (we don't want |
| * ourselves there) |
| */ |
| tree->prnt_host = NULL; |
| tree->all_chldrn_hl = hostlist_copy(*hl); |
| hostlist_delete_host(tree->all_chldrn_hl, |
| pmixp_info_hostname()); |
| tree->chldrn_str = |
| hostlist_ranged_string_xmalloc(tree->all_chldrn_hl); |
| } else { |
| /* for all other nodes in the tree we need to know: |
| * - nodename of our parent; |
| * - we don't need a list of all_childrens and hl anymore |
| */ |
| |
| /* |
| * setup parent id's |
| */ |
| p = hostlist_nth(*hl, tree->prnt_peerid); |
| tree->prnt_host = xstrdup(p); |
| free(p); |
| /* reset prnt_peerid to the global peer */ |
| tree->prnt_peerid = pmixp_info_job_hostid(tree->prnt_host); |
| |
| /* |
| * setup root id's |
| * (we need this for the Slurm API communication case) |
| */ |
| p = hostlist_nth(*hl, 0); |
| tree->root_host = xstrdup(p); |
| free(p); |
| /* reset prnt_peerid to the global peer */ |
| tree->root_peerid = pmixp_info_job_hostid(tree->root_host); |
| |
| /* use empty hostlist here */ |
| tree->all_chldrn_hl = hostlist_create(""); |
| tree->chldrn_str = NULL; |
| } |
| |
| /* fixup children peer ids to the global ones */ |
| for(i=0; i<tree->chldrn_cnt; i++){ |
| p = hostlist_nth(*hl, tree->chldrn_ids[i]); |
| tree->chldrn_ids[i] = pmixp_info_job_hostid(p); |
| free(p); |
| } |
| |
| /* Collective state */ |
| tree->ufwd_buf = pmixp_server_buf_new(); |
| tree->dfwd_buf = pmixp_server_buf_new(); |
| _reset_coll_ufwd(coll); |
| _reset_coll_dfwd(coll); |
| coll->cbdata = NULL; |
| coll->cbfunc = NULL; |
| |
| /* init fine grained lock */ |
| slurm_mutex_init(&coll->lock); |
| |
| return SLURM_SUCCESS; |
| } |
| |
| void pmixp_coll_tree_free(pmixp_coll_tree_t *tree) |
| { |
| if (NULL != tree->prnt_host) { |
| xfree(tree->prnt_host); |
| } |
| if (NULL != tree->root_host) { |
| xfree(tree->root_host); |
| } |
| hostlist_destroy(tree->all_chldrn_hl); |
| if (tree->chldrn_str) { |
| xfree(tree->chldrn_str); |
| } |
| if (NULL != tree->contrib_chld) { |
| xfree(tree->contrib_chld); |
| } |
| FREE_NULL_BUFFER(tree->ufwd_buf); |
| FREE_NULL_BUFFER(tree->dfwd_buf); |
| } |
| |
| typedef struct { |
| pmixp_coll_t *coll; |
| uint32_t seq; |
| volatile uint32_t refcntr; |
| } pmixp_coll_cbdata_t; |
| |
| /* |
| * use it for internal collective |
| * performance evaluation tool. |
| */ |
| pmixp_coll_t *pmixp_coll_tree_from_cbdata(void *cbdata) |
| { |
| pmixp_coll_cbdata_t *ptr = (pmixp_coll_cbdata_t*)cbdata; |
| pmixp_coll_sanity_check(ptr->coll); |
| return ptr->coll; |
| } |
| |
| static void _ufwd_sent_cb(int rc, pmixp_p2p_ctx_t ctx, void *_vcbdata) |
| { |
| pmixp_coll_cbdata_t *cbdata = (pmixp_coll_cbdata_t*)_vcbdata; |
| pmixp_coll_t *coll = cbdata->coll; |
| pmixp_coll_tree_t *tree = &coll->state.tree; |
| |
| if( PMIXP_P2P_REGULAR == ctx ){ |
| /* lock the collective */ |
| slurm_mutex_lock(&coll->lock); |
| } |
| if (cbdata->seq != coll->seq) { |
| /* it seems like this collective was reset since the time |
| * we initiated this send. |
| * Just exit to avoid data corruption. |
| */ |
| PMIXP_DEBUG("Collective was reset!"); |
| goto exit; |
| } |
| |
| xassert(PMIXP_COLL_TREE_UPFWD == tree->state || |
| PMIXP_COLL_TREE_UPFWD_WSC == tree->state); |
| |
| |
| /* Change the status */ |
| if( SLURM_SUCCESS == rc ){ |
| tree->ufwd_status = PMIXP_COLL_TREE_SND_DONE; |
| } else { |
| tree->ufwd_status = PMIXP_COLL_TREE_SND_FAILED; |
| } |
| |
| #ifdef PMIXP_COLL_DEBUG |
| PMIXP_DEBUG("%p: state: %s, snd_status=%s", |
| coll, pmixp_coll_tree_state2str(tree->state), |
| pmixp_coll_tree_sndstatus2str(tree->ufwd_status)); |
| #endif |
| |
| exit: |
| xassert(0 < cbdata->refcntr); |
| cbdata->refcntr--; |
| if (!cbdata->refcntr) { |
| xfree(cbdata); |
| } |
| |
| if( PMIXP_P2P_REGULAR == ctx ){ |
| /* progress, in the inline case progress |
| * will be invoked by the caller */ |
| _progress_coll_tree(coll); |
| |
| /* unlock the collective */ |
| slurm_mutex_unlock(&coll->lock); |
| } |
| } |
| |
| static void _dfwd_sent_cb(int rc, pmixp_p2p_ctx_t ctx, void *_vcbdata) |
| { |
| pmixp_coll_cbdata_t *cbdata = (pmixp_coll_cbdata_t*)_vcbdata; |
| pmixp_coll_t *coll = cbdata->coll; |
| pmixp_coll_tree_t *tree = &coll->state.tree; |
| |
| if( PMIXP_P2P_REGULAR == ctx ){ |
| /* lock the collective */ |
| slurm_mutex_lock(&coll->lock); |
| } |
| |
| if (cbdata->seq != coll->seq) { |
| /* it seems like this collective was reset since the time |
| * we initiated this send. |
| * Just exit to avoid data corruption. |
| */ |
| PMIXP_DEBUG("Collective was reset!"); |
| goto exit; |
| } |
| |
| xassert(PMIXP_COLL_TREE_DOWNFWD == tree->state); |
| |
| /* Change the status */ |
| if( SLURM_SUCCESS == rc ){ |
| tree->dfwd_cb_cnt++; |
| } else { |
| tree->dfwd_status = PMIXP_COLL_TREE_SND_FAILED; |
| } |
| |
| #ifdef PMIXP_COLL_DEBUG |
| PMIXP_DEBUG("%p: state: %s, snd_status=%s, compl_cnt=%d/%d", |
| coll, pmixp_coll_tree_state2str(tree->state), |
| pmixp_coll_tree_sndstatus2str(tree->dfwd_status), |
| tree->dfwd_cb_cnt, tree->dfwd_cb_wait); |
| #endif |
| |
| exit: |
| xassert(0 < cbdata->refcntr); |
| cbdata->refcntr--; |
| if (!cbdata->refcntr) { |
| xfree(cbdata); |
| } |
| |
| if( PMIXP_P2P_REGULAR == ctx ){ |
| /* progress, in the inline case progress |
| * will be invoked by the caller */ |
| _progress_coll_tree(coll); |
| |
| /* unlock the collective */ |
| slurm_mutex_unlock(&coll->lock); |
| } |
| } |
| |
| static void _libpmix_cb(void *_vcbdata) |
| { |
| pmixp_coll_cbdata_t *cbdata = (pmixp_coll_cbdata_t*)_vcbdata; |
| pmixp_coll_t *coll = cbdata->coll; |
| pmixp_coll_tree_t *tree = &coll->state.tree; |
| |
| /* lock the collective */ |
| slurm_mutex_lock(&coll->lock); |
| |
| if (cbdata->seq != coll->seq) { |
| /* it seems like this collective was reset since the time |
| * we initiated this send. |
| * Just exit to avoid data corruption. |
| */ |
| PMIXP_ERROR("%p: collective was reset: myseq=%u, curseq=%u", |
| coll, cbdata->seq, coll->seq); |
| goto exit; |
| } |
| |
| xassert(PMIXP_COLL_TREE_DOWNFWD == tree->state); |
| |
| tree->dfwd_cb_cnt++; |
| #ifdef PMIXP_COLL_DEBUG |
| PMIXP_DEBUG("%p: state: %s, snd_status=%s, compl_cnt=%d/%d", |
| coll, pmixp_coll_tree_state2str(tree->state), |
| pmixp_coll_tree_sndstatus2str(tree->dfwd_status), |
| tree->dfwd_cb_cnt, tree->dfwd_cb_wait); |
| #endif |
| _progress_coll_tree(coll); |
| |
| exit: |
| xassert(0 < cbdata->refcntr); |
| cbdata->refcntr--; |
| if (!cbdata->refcntr) { |
| xfree(cbdata); |
| } |
| |
| /* unlock the collective */ |
| slurm_mutex_unlock(&coll->lock); |
| } |
| |
| static int _progress_collect(pmixp_coll_t *coll) |
| { |
| pmixp_ep_t ep = {0}; |
| int rc; |
| pmixp_coll_tree_t *tree = &coll->state.tree; |
| |
| xassert(PMIXP_COLL_TREE_COLLECT == tree->state); |
| |
| ep.type = PMIXP_EP_NONE; |
| #ifdef PMIXP_COLL_DEBUG |
| PMIXP_DEBUG("%p: state=%s, local=%d, child_cntr=%d", |
| coll, pmixp_coll_tree_state2str(tree->state), |
| (int)tree->contrib_local, tree->contrib_children); |
| #endif |
| /* sanity check */ |
| pmixp_coll_sanity_check(coll); |
| |
| if (PMIXP_COLL_TREE_COLLECT != tree->state) { |
| /* In case of race condition between libpmix and |
| * slurm threads we can be called |
| * after we moved to the next step. */ |
| return 0; |
| } |
| |
| if (!tree->contrib_local || |
| tree->contrib_children != tree->chldrn_cnt) { |
| /* Not yet ready to go to the next step */ |
| return 0; |
| } |
| |
| if (pmixp_info_srv_direct_conn()) { |
| /* We will need to forward aggregated |
| * message back to our children */ |
| tree->state = PMIXP_COLL_TREE_UPFWD; |
| } else { |
| /* If we use Slurm API (SAPI) - intermediate nodes |
| * don't need to forward data as the root will do |
| * SAPI broadcast. |
| * So, only root has to go through the full UPFWD |
| * state and send the message back. |
| * Other procs have to go through other route. The reason for |
| * that is the fact that some of out children can receive bcast |
| * message early and initiate next collective. We need to handle |
| * that properly. |
| */ |
| if (0 > tree->prnt_peerid) { |
| tree->state = PMIXP_COLL_TREE_UPFWD; |
| } else { |
| tree->state = PMIXP_COLL_TREE_UPFWD_WSC; |
| } |
| } |
| |
| /* The root of the collective will have parent_host == NULL */ |
| if (NULL != tree->prnt_host) { |
| ep.type = PMIXP_EP_NOIDEID; |
| ep.ep.nodeid = tree->prnt_peerid; |
| tree->ufwd_status = PMIXP_COLL_TREE_SND_ACTIVE; |
| PMIXP_DEBUG("%p: send data to %s:%d", |
| coll, tree->prnt_host, tree->prnt_peerid); |
| } else { |
| /* move data from input buffer to the output */ |
| char *dst, *src = get_buf_data(tree->ufwd_buf) + |
| tree->ufwd_offset; |
| size_t size = get_buf_offset(tree->ufwd_buf) - |
| tree->ufwd_offset; |
| if (try_grow_buf_remaining(tree->dfwd_buf, size)) |
| return 0; |
| dst = get_buf_data(tree->dfwd_buf) + tree->dfwd_offset; |
| memcpy(dst, src, size); |
| set_buf_offset(tree->dfwd_buf, tree->dfwd_offset + size); |
| /* no need to send */ |
| tree->ufwd_status = PMIXP_COLL_TREE_SND_DONE; |
| /* this is root */ |
| tree->contrib_prnt = true; |
| } |
| |
| if (PMIXP_EP_NONE != ep.type) { |
| pmixp_coll_cbdata_t *cbdata; |
| cbdata = xmalloc(sizeof(pmixp_coll_cbdata_t)); |
| cbdata->coll = coll; |
| cbdata->seq = coll->seq; |
| cbdata->refcntr = 1; |
| char *nodename = tree->prnt_host; |
| rc = pmixp_server_send_nb(&ep, PMIXP_MSG_FAN_IN, coll->seq, |
| tree->ufwd_buf, |
| _ufwd_sent_cb, cbdata); |
| |
| if (SLURM_SUCCESS != rc) { |
| PMIXP_ERROR("Cannot send data (size = %u), to %s:%d", |
| get_buf_offset(tree->ufwd_buf), |
| nodename, ep.ep.nodeid); |
| tree->ufwd_status = PMIXP_COLL_TREE_SND_FAILED; |
| } |
| #ifdef PMIXP_COLL_DEBUG |
| PMIXP_DEBUG("%p: fwd to %s:%d, size = %u", |
| coll, nodename, ep.ep.nodeid, |
| get_buf_offset(tree->dfwd_buf)); |
| #endif |
| } |
| |
| /* events observed - need another iteration */ |
| return true; |
| } |
| |
| static int _progress_ufwd(pmixp_coll_t *coll) |
| { |
| pmixp_coll_tree_t *tree = &coll->state.tree; |
| pmixp_ep_t ep[tree->chldrn_cnt]; |
| int ep_cnt = 0; |
| int rc, i; |
| char *nodename = NULL; |
| pmixp_coll_cbdata_t *cbdata = NULL; |
| |
| xassert(PMIXP_COLL_TREE_UPFWD == tree->state); |
| |
| /* for some reasons doesn't switch to downfwd */ |
| |
| switch (tree->ufwd_status) { |
| case PMIXP_COLL_TREE_SND_FAILED: |
| /* something went wrong with upward send. |
| * notify libpmix about that and abort |
| * collective */ |
| |
| /* respond to the libpmix */ |
| pmixp_coll_localcb_nodata(coll, SLURM_ERROR); |
| |
| _reset_coll(coll); |
| /* Don't need to do anything else */ |
| return false; |
| case PMIXP_COLL_TREE_SND_ACTIVE: |
| /* still waiting for the send completion */ |
| return false; |
| case PMIXP_COLL_TREE_SND_DONE: |
| if (tree->contrib_prnt) { |
| /* all-set to go to the next stage */ |
| break; |
| } |
| return false; |
| default: |
| /* Should not happen */ |
| PMIXP_ERROR("Bad collective ufwd state=%d", |
| (int)tree->ufwd_status); |
| /* collective is spoiled, reset state */ |
| tree->state = PMIXP_COLL_TREE_SYNC; |
| slurm_kill_job_step(pmixp_info_jobid(), |
| pmixp_info_stepid(), SIGKILL, 0); |
| return false; |
| } |
| |
| /* We now can upward part for the next collective */ |
| _reset_coll_ufwd(coll); |
| |
| /* move to the next state */ |
| tree->state = PMIXP_COLL_TREE_DOWNFWD; |
| tree->dfwd_status = PMIXP_COLL_TREE_SND_ACTIVE; |
| if (!pmixp_info_srv_direct_conn()) { |
| /* only root of the tree should get here */ |
| xassert(0 > tree->prnt_peerid); |
| if (tree->chldrn_cnt) { |
| /* We can run on just one node */ |
| ep[ep_cnt].type = PMIXP_EP_HLIST; |
| ep[ep_cnt].ep.hostlist = tree->chldrn_str; |
| ep_cnt++; |
| } |
| } else { |
| for(i=0; i<tree->chldrn_cnt; i++){ |
| ep[i].type = PMIXP_EP_NOIDEID; |
| ep[i].ep.nodeid = tree->chldrn_ids[i]; |
| ep_cnt++; |
| } |
| } |
| |
| /* We need to wait for ep_cnt send completions + the local callback */ |
| tree->dfwd_cb_wait = ep_cnt; |
| |
| if (ep_cnt || coll->cbfunc) { |
| /* allocate the callback data */ |
| cbdata = xmalloc(sizeof(pmixp_coll_cbdata_t)); |
| cbdata->coll = coll; |
| cbdata->seq = coll->seq; |
| cbdata->refcntr = ep_cnt; |
| if (coll->cbfunc) { |
| cbdata->refcntr++; |
| } |
| } |
| |
| for(i=0; i < ep_cnt; i++){ |
| rc = pmixp_server_send_nb(&ep[i], PMIXP_MSG_FAN_OUT, coll->seq, |
| tree->dfwd_buf, |
| _dfwd_sent_cb, cbdata); |
| |
| if (SLURM_SUCCESS != rc) { |
| if (PMIXP_EP_NOIDEID == ep[i].type){ |
| nodename = pmixp_info_job_host(ep[i].ep.nodeid); |
| PMIXP_ERROR("Cannot send data (size = %u), " |
| "to %s:%d", |
| get_buf_offset(tree->dfwd_buf), |
| nodename, ep[i].ep.nodeid); |
| xfree(nodename); |
| } else { |
| PMIXP_ERROR("Cannot send data (size = %u), " |
| "to %s", |
| get_buf_offset(tree->dfwd_buf), |
| ep[i].ep.hostlist); |
| } |
| tree->dfwd_status = PMIXP_COLL_TREE_SND_FAILED; |
| } |
| #ifdef PMIXP_COLL_DEBUG |
| if (PMIXP_EP_NOIDEID == ep[i].type) { |
| nodename = pmixp_info_job_host(ep[i].ep.nodeid); |
| PMIXP_DEBUG("%p: fwd to %s:%d, size = %u", |
| coll, nodename, ep[i].ep.nodeid, |
| get_buf_offset(tree->dfwd_buf)); |
| xfree(nodename); |
| } else { |
| PMIXP_DEBUG("%p: fwd to %s, size = %u", |
| coll, ep[i].ep.hostlist, |
| get_buf_offset(tree->dfwd_buf)); |
| } |
| #endif |
| } |
| |
| if (coll->cbfunc) { |
| char *data = get_buf_data(tree->dfwd_buf) + tree->dfwd_offset; |
| size_t size = get_buf_offset(tree->dfwd_buf) - |
| tree->dfwd_offset; |
| tree->dfwd_cb_wait++; |
| pmixp_lib_modex_invoke(coll->cbfunc, SLURM_SUCCESS, |
| data, size, coll->cbdata, |
| _libpmix_cb, (void*)cbdata); |
| /* Clear callback info as we are not |
| * allowed to use it second time |
| */ |
| coll->cbfunc = NULL; |
| coll->cbdata = NULL; |
| #ifdef PMIXP_COLL_DEBUG |
| PMIXP_DEBUG("%p: local delivery, size = %lu", |
| coll, size); |
| #endif |
| } |
| |
| /* events observed - need another iteration */ |
| return true; |
| } |
| |
| static int _progress_ufwd_sc(pmixp_coll_t *coll) |
| { |
| pmixp_coll_tree_t *tree = &coll->state.tree; |
| |
| xassert(PMIXP_COLL_TREE_UPFWD_WSC == tree->state); |
| |
| /* for some reasons doesn't switch to downfwd */ |
| switch (tree->ufwd_status) { |
| case PMIXP_COLL_TREE_SND_FAILED: |
| /* something went wrong with upward send. |
| * notify libpmix about that and abort |
| * collective */ |
| |
| /* respond to the libpmix */ |
| pmixp_coll_localcb_nodata(coll, SLURM_ERROR); |
| |
| _reset_coll(coll); |
| /* Don't need to do anything else */ |
| return false; |
| case PMIXP_COLL_TREE_SND_ACTIVE: |
| /* still waiting for the send completion */ |
| return false; |
| case PMIXP_COLL_TREE_SND_DONE: |
| /* move to the next step */ |
| break; |
| default: |
| /* Should not happen */ |
| PMIXP_ERROR("Bad collective ufwd state=%d", |
| (int)tree->ufwd_status); |
| /* collective is spoiled, reset state */ |
| tree->state = PMIXP_COLL_TREE_SYNC; |
| slurm_kill_job_step(pmixp_info_jobid(), |
| pmixp_info_stepid(), SIGKILL, 0); |
| return false; |
| } |
| |
| /* We now can upward part for the next collective */ |
| _reset_coll_ufwd(coll); |
| |
| /* move to the next state */ |
| tree->state = PMIXP_COLL_TREE_UPFWD_WPC; |
| return true; |
| } |
| |
| static int _progress_ufwd_wpc(pmixp_coll_t *coll) |
| { |
| pmixp_coll_tree_t *tree = &coll->state.tree; |
| xassert(PMIXP_COLL_TREE_UPFWD_WPC == tree->state); |
| |
| if (!tree->contrib_prnt) { |
| return false; |
| } |
| |
| /* Need to wait only for the local completion callback if installed*/ |
| tree->dfwd_status = PMIXP_COLL_TREE_SND_ACTIVE; |
| tree->dfwd_cb_wait = 0; |
| |
| |
| /* move to the next state */ |
| tree->state = PMIXP_COLL_TREE_DOWNFWD; |
| |
| /* local delivery */ |
| if (coll->cbfunc) { |
| pmixp_coll_cbdata_t *cbdata; |
| cbdata = xmalloc(sizeof(pmixp_coll_cbdata_t)); |
| cbdata->coll = coll; |
| cbdata->seq = coll->seq; |
| cbdata->refcntr = 1; |
| |
| char *data = get_buf_data(tree->dfwd_buf) + tree->dfwd_offset; |
| size_t size = get_buf_offset(tree->dfwd_buf) - |
| tree->dfwd_offset; |
| pmixp_lib_modex_invoke(coll->cbfunc, SLURM_SUCCESS, data, size, |
| coll->cbdata, _libpmix_cb, |
| (void *)cbdata); |
| tree->dfwd_cb_wait++; |
| /* Clear callback info as we are not |
| * allowed to use it second time |
| */ |
| coll->cbfunc = NULL; |
| coll->cbdata = NULL; |
| #ifdef PMIXP_COLL_DEBUG |
| PMIXP_DEBUG("%p: local delivery, size = %lu", |
| coll, size); |
| #endif |
| } |
| |
| /* events observed - need another iteration */ |
| return true; |
| } |
| |
| static int _progress_dfwd(pmixp_coll_t *coll) |
| { |
| pmixp_coll_tree_t *tree = &coll->state.tree; |
| xassert(PMIXP_COLL_TREE_DOWNFWD == tree->state); |
| |
| /* if all children + local callbacks was invoked */ |
| if (tree->dfwd_cb_wait == tree->dfwd_cb_cnt) { |
| tree->dfwd_status = PMIXP_COLL_TREE_SND_DONE; |
| } |
| |
| switch (tree->dfwd_status) { |
| case PMIXP_COLL_TREE_SND_ACTIVE: |
| return false; |
| case PMIXP_COLL_TREE_SND_FAILED: |
| /* something went wrong with upward send. |
| * notify libpmix about that and abort |
| * collective */ |
| PMIXP_ERROR("%p: failed to send, abort collective", coll); |
| |
| |
| /* respond to the libpmix */ |
| pmixp_coll_localcb_nodata(coll, SLURM_ERROR); |
| |
| _reset_coll(coll); |
| /* Don't need to do anything else */ |
| return false; |
| case PMIXP_COLL_TREE_SND_DONE: |
| break; |
| default: |
| /* Should not happen */ |
| PMIXP_ERROR("Bad collective dfwd state=%d", |
| (int)tree->dfwd_status); |
| /* collective is spoiled, reset state */ |
| tree->state = PMIXP_COLL_TREE_SYNC; |
| slurm_kill_job_step(pmixp_info_jobid(), |
| pmixp_info_stepid(), SIGKILL, 0); |
| return false; |
| } |
| #ifdef PMIXP_COLL_DEBUG |
| PMIXP_DEBUG("%p: %s seq=%d is DONE", coll, |
| pmixp_coll_type2str(coll->type), coll->seq); |
| #endif |
| _reset_coll(coll); |
| |
| return true; |
| } |
| |
| static void _progress_coll_tree(pmixp_coll_t *coll) |
| { |
| pmixp_coll_tree_t *tree = &coll->state.tree; |
| int ret = 0; |
| |
| do { |
| switch (tree->state) { |
| case PMIXP_COLL_TREE_SYNC: |
| /* check if any activity was observed */ |
| if (tree->contrib_local || tree->contrib_children) { |
| tree->state = PMIXP_COLL_TREE_COLLECT; |
| ret = true; |
| } else { |
| ret = false; |
| } |
| break; |
| case PMIXP_COLL_TREE_COLLECT: |
| ret = _progress_collect(coll); |
| break; |
| case PMIXP_COLL_TREE_UPFWD: |
| ret = _progress_ufwd(coll); |
| break; |
| case PMIXP_COLL_TREE_UPFWD_WSC: |
| ret = _progress_ufwd_sc(coll); |
| break; |
| case PMIXP_COLL_TREE_UPFWD_WPC: |
| ret = _progress_ufwd_wpc(coll); |
| break; |
| case PMIXP_COLL_TREE_DOWNFWD: |
| ret = _progress_dfwd(coll); |
| break; |
| default: |
| PMIXP_ERROR("%p: unknown state = %d", |
| coll, tree->state); |
| } |
| } while(ret); |
| } |
| |
| int pmixp_coll_tree_local(pmixp_coll_t *coll, char *data, size_t size, |
| void *cbfunc, void *cbdata) |
| { |
| pmixp_coll_tree_t *tree = NULL; |
| int ret = SLURM_SUCCESS; |
| |
| pmixp_debug_hang(0); |
| |
| /* sanity check */ |
| pmixp_coll_sanity_check(coll); |
| |
| /* lock the structure */ |
| slurm_mutex_lock(&coll->lock); |
| tree = &coll->state.tree; |
| |
| #ifdef PMIXP_COLL_DEBUG |
| PMIXP_DEBUG("%p: contrib/loc: seqnum=%u, state=%s, size=%zu", |
| coll, coll->seq, pmixp_coll_tree_state2str(tree->state), |
| size); |
| #endif |
| |
| switch (tree->state) { |
| case PMIXP_COLL_TREE_SYNC: |
| /* change the state */ |
| coll->ts = time(NULL); |
| /* fall-thru */ |
| case PMIXP_COLL_TREE_COLLECT: |
| /* sanity check */ |
| break; |
| case PMIXP_COLL_TREE_DOWNFWD: |
| /* We are waiting for some send requests |
| * to be finished, but local node has started |
| * the next contribution. |
| * This is an OK situation, go ahead and store |
| * it, the buffer with the contribution is not used |
| * now. |
| */ |
| #ifdef PMIXP_COLL_DEBUG |
| PMIXP_DEBUG("%p: contrib/loc: next coll!", coll); |
| #endif |
| break; |
| case PMIXP_COLL_TREE_UPFWD: |
| case PMIXP_COLL_TREE_UPFWD_WSC: |
| case PMIXP_COLL_TREE_UPFWD_WPC: |
| /* this is not a correct behavior, respond with an error. */ |
| #ifdef PMIXP_COLL_DEBUG |
| PMIXP_DEBUG("%p: contrib/loc: before prev coll is finished!", |
| coll); |
| #endif |
| ret = SLURM_ERROR; |
| goto exit; |
| default: |
| /* FATAL: should not happen in normal workflow */ |
| PMIXP_ERROR("%p: local contrib while active collective, state = %s", |
| coll, pmixp_coll_tree_state2str(tree->state)); |
| /* collective is spoiled, reset state */ |
| tree->state = PMIXP_COLL_TREE_SYNC; |
| slurm_kill_job_step(pmixp_info_jobid(), |
| pmixp_info_stepid(), SIGKILL, 0); |
| ret = SLURM_ERROR; |
| goto exit; |
| } |
| |
| if (tree->contrib_local) { |
| /* Double contribution - reject |
| * FIXME: check if need to support multiple non-blocking |
| * operations on the same process set */ |
| ret = SLURM_ERROR; |
| goto exit; |
| } |
| |
| /* save & mark local contribution */ |
| tree->contrib_local = true; |
| if ((ret = try_grow_buf_remaining(tree->ufwd_buf, size))) |
| goto exit; |
| memcpy(get_buf_data(tree->ufwd_buf) + get_buf_offset(tree->ufwd_buf), |
| data, size); |
| set_buf_offset(tree->ufwd_buf, get_buf_offset(tree->ufwd_buf) + size); |
| |
| /* setup callback info */ |
| coll->cbfunc = cbfunc; |
| coll->cbdata = cbdata; |
| |
| /* check if the collective is ready to progress */ |
| _progress_coll_tree(coll); |
| |
| #ifdef PMIXP_COLL_DEBUG |
| PMIXP_DEBUG("%p: finish, state=%s", coll, |
| pmixp_coll_tree_state2str(tree->state)); |
| #endif |
| |
| exit: |
| /* unlock the structure */ |
| slurm_mutex_unlock(&coll->lock); |
| return ret; |
| } |
| |
| static int _chld_id(pmixp_coll_tree_t *tree, uint32_t nodeid) |
| { |
| int i; |
| |
| for (i=0; i<tree->chldrn_cnt; i++) { |
| if (tree->chldrn_ids[i] == nodeid) { |
| return i; |
| } |
| } |
| return -1; |
| } |
| |
| static char *_chld_ids_str(pmixp_coll_tree_t *tree) |
| { |
| char *p = NULL; |
| int i; |
| |
| for (i=0; i<tree->chldrn_cnt; i++) { |
| if ((tree->chldrn_cnt-1) > i) { |
| xstrfmtcat(p, "%d, ", tree->chldrn_ids[i]); |
| } else { |
| xstrfmtcat(p, "%d", tree->chldrn_ids[i]); |
| } |
| } |
| return p; |
| } |
| |
| |
| int pmixp_coll_tree_child(pmixp_coll_t *coll, uint32_t peerid, uint32_t seq, |
| buf_t *buf) |
| { |
| char *data_src = NULL, *data_dst = NULL; |
| uint32_t size; |
| int chld_id; |
| pmixp_coll_tree_t *tree = NULL; |
| |
| /* lock the structure */ |
| slurm_mutex_lock(&coll->lock); |
| pmixp_coll_sanity_check(coll); |
| tree = &coll->state.tree; |
| |
| if (0 > (chld_id = _chld_id(tree, peerid))) { |
| char *nodename = pmixp_info_job_host(peerid); |
| char *avail_ids = _chld_ids_str(tree); |
| PMIXP_DEBUG("%p: contribution from the non-child node %s:%u, acceptable ids: %s", |
| coll, nodename, peerid, avail_ids); |
| xfree(nodename); |
| xfree(avail_ids); |
| } |
| |
| #ifdef PMIXP_COLL_DEBUG |
| PMIXP_DEBUG("%p: contrib/rem from nodeid=%u, childid=%d, state=%s, size=%u", |
| coll, peerid, chld_id, |
| pmixp_coll_tree_state2str(tree->state), |
| remaining_buf(buf)); |
| #endif |
| |
| switch (tree->state) { |
| case PMIXP_COLL_TREE_SYNC: |
| /* change the state */ |
| coll->ts = time(NULL); |
| /* fall-thru */ |
| case PMIXP_COLL_TREE_COLLECT: |
| /* sanity check */ |
| if (coll->seq != seq) { |
| char *nodename = pmixp_info_job_host(peerid); |
| /* FATAL: should not happen in normal workflow */ |
| PMIXP_ERROR("%p: unexpected contrib from %s:%d (child #%d) seq = %d, coll->seq = %d, state=%s", |
| coll, nodename, peerid, chld_id, |
| seq, coll->seq, |
| pmixp_coll_tree_state2str(tree->state)); |
| xfree(nodename); |
| goto error; |
| } |
| break; |
| case PMIXP_COLL_TREE_UPFWD: |
| case PMIXP_COLL_TREE_UPFWD_WSC: |
| { |
| char *nodename = pmixp_info_job_host(peerid); |
| /* FATAL: should not happen in normal workflow */ |
| PMIXP_ERROR("%p: unexpected contrib from %s:%d, state = %s", |
| coll, nodename, peerid, |
| pmixp_coll_tree_state2str(tree->state)); |
| xfree(nodename); |
| goto error; |
| } |
| case PMIXP_COLL_TREE_UPFWD_WPC: |
| case PMIXP_COLL_TREE_DOWNFWD: |
| #ifdef PMIXP_COLL_DEBUG |
| /* It looks like a retransmission attempt when remote side |
| * identified transmission failure, but we actually successfully |
| * received the message */ |
| PMIXP_DEBUG("%p: contrib for the next coll. nodeid=%u, child=%d seq=%u, coll->seq=%u, state=%s", |
| coll, peerid, chld_id, seq, coll->seq, |
| pmixp_coll_tree_state2str(tree->state)); |
| #endif |
| if ((coll->seq +1) != seq) { |
| char *nodename = pmixp_info_job_host(peerid); |
| /* should not happen in normal workflow */ |
| PMIXP_ERROR("%p: unexpected contrib from %s:%d(x:%d) seq = %d, coll->seq = %d, state=%s", |
| coll, nodename, peerid, chld_id, |
| seq, coll->seq, |
| pmixp_coll_tree_state2str(tree->state)); |
| xfree(nodename); |
| goto error; |
| } |
| break; |
| default: |
| /* should not happen in normal workflow */ |
| PMIXP_ERROR("%p: unknown collective state %s", |
| coll, pmixp_coll_tree_state2str(tree->state)); |
| /* collective is spoiled, reset state */ |
| tree->state = PMIXP_COLL_TREE_SYNC; |
| goto error2; |
| } |
| |
| /* Because of possible timeouts/delays in transmission we |
| * can receive a contribution multiple times. Avoid duplications |
| * by checking our records. */ |
| if (tree->contrib_chld[chld_id]) { |
| char *nodename = pmixp_info_job_host(peerid); |
| /* May be 0 or 1. If grater - transmission skew, ignore. |
| * NOTE: this output is not on the critical path - |
| * don't preprocess it out */ |
| PMIXP_DEBUG("%p: multiple contribs from %s:%d(x:%d)", |
| coll, nodename, peerid, chld_id); |
| /* this is duplication, skip. */ |
| xfree(nodename); |
| goto proceed; |
| } |
| |
| data_src = get_buf_data(buf) + get_buf_offset(buf); |
| size = remaining_buf(buf); |
| if (try_grow_buf_remaining(tree->ufwd_buf, size)) |
| goto error; |
| data_dst = get_buf_data(tree->ufwd_buf) + |
| get_buf_offset(tree->ufwd_buf); |
| memcpy(data_dst, data_src, size); |
| set_buf_offset(tree->ufwd_buf, get_buf_offset(tree->ufwd_buf) + size); |
| |
| /* increase number of individual contributions */ |
| tree->contrib_chld[chld_id] = true; |
| /* increase number of total contributions */ |
| tree->contrib_children++; |
| |
| proceed: |
| _progress_coll_tree(coll); |
| |
| #ifdef PMIXP_COLL_DEBUG |
| PMIXP_DEBUG("%p: finish nodeid=%u, child=%d, state=%s", |
| coll, peerid, chld_id, |
| pmixp_coll_tree_state2str(tree->state)); |
| #endif |
| /* unlock the structure */ |
| slurm_mutex_unlock(&coll->lock); |
| |
| return SLURM_SUCCESS; |
| error: |
| pmixp_coll_log(coll); |
| _reset_coll(coll); |
| error2: |
| slurm_kill_job_step(pmixp_info_jobid(), |
| pmixp_info_stepid(), SIGKILL, 0); |
| /* unlock the structure */ |
| slurm_mutex_unlock(&coll->lock); |
| |
| return SLURM_ERROR; |
| } |
| |
| int pmixp_coll_tree_parent(pmixp_coll_t *coll, uint32_t peerid, uint32_t seq, |
| buf_t *buf) |
| { |
| pmixp_coll_tree_t *tree = NULL; |
| char *data_src = NULL, *data_dst = NULL; |
| uint32_t size; |
| int expected_peerid; |
| |
| /* lock the structure */ |
| slurm_mutex_lock(&coll->lock); |
| tree = &coll->state.tree; |
| |
| if (pmixp_info_srv_direct_conn()) { |
| expected_peerid = tree->prnt_peerid; |
| } else { |
| expected_peerid = tree->root_peerid; |
| } |
| |
| if (expected_peerid != peerid) { |
| char *nodename = pmixp_info_job_host(peerid); |
| /* protect ourselves if we are running with no asserts */ |
| PMIXP_ERROR("%p: parent contrib from bad nodeid=%s:%u, expect=%d", |
| coll, nodename, peerid, expected_peerid); |
| xfree(nodename); |
| goto proceed; |
| } |
| |
| #ifdef PMIXP_COLL_DEBUG |
| PMIXP_DEBUG("%p: contrib/rem nodeid=%u: state=%s, size=%u", |
| coll, peerid, pmixp_coll_tree_state2str(tree->state), |
| remaining_buf(buf)); |
| #endif |
| |
| switch (tree->state) { |
| case PMIXP_COLL_TREE_SYNC: |
| case PMIXP_COLL_TREE_COLLECT: |
| /* It looks like a retransmission attempt when remote side |
| * identified transmission failure, but we actually successfully |
| * received the message */ |
| #ifdef PMIXP_COLL_DEBUG |
| PMIXP_DEBUG("%p: prev contrib nodeid=%u: seq=%u, cur_seq=%u, state=%s", |
| coll, peerid, seq, coll->seq, |
| pmixp_coll_tree_state2str(tree->state)); |
| #endif |
| /* sanity check */ |
| if ((coll->seq - 1) != seq) { |
| /* FATAL: should not happen in normal workflow */ |
| char *nodename = pmixp_info_job_host(peerid); |
| PMIXP_ERROR("%p: unexpected from %s:%d: seq = %d, coll->seq = %d, state=%s", |
| coll, nodename, peerid, seq, coll->seq, |
| pmixp_coll_tree_state2str(tree->state)); |
| xfree(nodename); |
| goto error; |
| } |
| goto proceed; |
| case PMIXP_COLL_TREE_UPFWD_WSC:{ |
| /* we are not actually ready to receive this contribution as |
| * the upward portion of the collective wasn't received yet. |
| * This should not happen as SAPI (Slurm API) is blocking and |
| * we should transit to PMIXP_COLL_UPFWD_WPC immediately */ |
| /* FATAL: should not happen in normal workflow */ |
| char *nodename = pmixp_info_job_host(peerid); |
| PMIXP_ERROR("%p: unexpected from %s:%d: seq = %d, coll->seq = %d, state=%s", |
| coll, nodename, peerid, seq, coll->seq, |
| pmixp_coll_tree_state2str(tree->state)); |
| xfree(nodename); |
| goto error; |
| } |
| case PMIXP_COLL_TREE_UPFWD: |
| case PMIXP_COLL_TREE_UPFWD_WPC: |
| /* we were waiting for this */ |
| break; |
| case PMIXP_COLL_TREE_DOWNFWD: |
| /* It looks like a retransmission attempt when remote side |
| * identified transmission failure, but we actually successfully |
| * received the message */ |
| #ifdef PMIXP_COLL_DEBUG |
| PMIXP_DEBUG("%p: double contrib nodeid=%u seq=%u, cur_seq=%u, state=%s", |
| coll, peerid, seq, coll->seq, |
| pmixp_coll_tree_state2str(tree->state)); |
| #endif |
| /* sanity check */ |
| if (coll->seq != seq) { |
| char *nodename = pmixp_info_job_host(peerid); |
| /* FATAL: should not happen in normal workflow */ |
| PMIXP_ERROR("%p: unexpected contrib from %s:%d: seq = %d, coll->seq = %d, state=%s", |
| coll, nodename, peerid, seq, coll->seq, |
| pmixp_coll_tree_state2str(tree->state)); |
| xfree(nodename); |
| goto error; |
| } |
| goto proceed; |
| default: |
| /* should not happen in normal workflow */ |
| PMIXP_ERROR("%p: unknown collective state %s", |
| coll, pmixp_coll_tree_state2str(tree->state)); |
| /* collective is spoiled, reset state */ |
| tree->state = PMIXP_COLL_TREE_SYNC; |
| goto error2; |
| } |
| |
| /* Because of possible timeouts/delays in transmission we |
| * can receive a contribution second time. Avoid duplications |
| * by checking our records. */ |
| if (tree->contrib_prnt) { |
| char *nodename = pmixp_info_job_host(peerid); |
| /* May be 0 or 1. If grater - transmission skew, ignore. |
| * NOTE: this output is not on the critical path - |
| * don't preprocess it out */ |
| PMIXP_DEBUG("%p: multiple contributions from parent %s:%d", |
| coll, nodename, peerid); |
| xfree(nodename); |
| /* this is duplication, skip. */ |
| goto proceed; |
| } |
| tree->contrib_prnt = true; |
| |
| data_src = get_buf_data(buf) + get_buf_offset(buf); |
| size = remaining_buf(buf); |
| if (try_grow_buf_remaining(tree->dfwd_buf, size)) |
| goto error; |
| data_dst = get_buf_data(tree->dfwd_buf) + |
| get_buf_offset(tree->dfwd_buf); |
| memcpy(data_dst, data_src, size); |
| set_buf_offset(tree->dfwd_buf, |
| get_buf_offset(tree->dfwd_buf) + size); |
| proceed: |
| _progress_coll_tree(coll); |
| |
| #ifdef PMIXP_COLL_DEBUG |
| PMIXP_DEBUG("%p: finish: nodeid=%u, state=%s", |
| coll, peerid, pmixp_coll_tree_state2str(tree->state)); |
| #endif |
| /* unlock the structure */ |
| slurm_mutex_unlock(&coll->lock); |
| |
| return SLURM_SUCCESS; |
| error: |
| pmixp_coll_log(coll); |
| _reset_coll(coll); |
| error2: |
| slurm_kill_job_step(pmixp_info_jobid(), |
| pmixp_info_stepid(), SIGKILL, 0); |
| slurm_mutex_unlock(&coll->lock); |
| |
| return SLURM_ERROR; |
| } |
| |
| void pmixp_coll_tree_reset_if_to(pmixp_coll_t *coll, time_t ts) |
| { |
| pmixp_coll_tree_t *tree = NULL; |
| |
| /* lock the */ |
| slurm_mutex_lock(&coll->lock); |
| tree = &coll->state.tree; |
| |
| if (PMIXP_COLL_TREE_SYNC == tree->state) { |
| goto unlock; |
| } |
| |
| if (ts - coll->ts > pmixp_info_timeout()) { |
| /* respond to the libpmix */ |
| pmixp_coll_localcb_nodata(coll, PMIX_ERR_TIMEOUT); |
| |
| /* report the timeout event */ |
| PMIXP_ERROR("%p: collective timeout seq=%d", coll, coll->seq); |
| pmixp_coll_log(coll); |
| /* drop the collective */ |
| _reset_coll(coll); |
| } |
| unlock: |
| /* unlock the structure */ |
| slurm_mutex_unlock(&coll->lock); |
| } |
| |
| void pmixp_coll_tree_log(pmixp_coll_t *coll) |
| { |
| int i; |
| pmixp_coll_tree_t *tree = &coll->state.tree; |
| char *nodename; |
| |
| PMIXP_ERROR("%p: %s state seq=%d contribs: loc=%d/prnt=%d/child=%u", |
| coll, pmixp_coll_type2str(coll->type), coll->seq, |
| tree->contrib_local, tree->contrib_prnt, |
| tree->contrib_children); |
| nodename = pmixp_info_job_host(coll->my_peerid); |
| PMIXP_ERROR("my peerid: %d:%s", coll->my_peerid, nodename); |
| xfree(nodename); |
| nodename = pmixp_info_job_host(tree->root_peerid); |
| PMIXP_ERROR("root host: %d:%s", tree->root_peerid, nodename); |
| xfree(nodename); |
| if (tree->prnt_peerid >= 0) { |
| PMIXP_ERROR("prnt host: %d:%s", |
| tree->prnt_peerid, tree->prnt_host); |
| PMIXP_ERROR("prnt contrib:"); |
| PMIXP_ERROR("\t [%d:%s] %s", tree->prnt_peerid, tree->prnt_host, |
| tree->contrib_prnt ? "true" : "false"); |
| } |
| if (tree->chldrn_cnt) { |
| char *done_contrib = NULL, *wait_contrib = NULL; |
| hostlist_t *hl_done_contrib = NULL, |
| *hl_wait_contrib = NULL, **tmp_list; |
| |
| PMIXP_ERROR("child contribs [%d]:", tree->chldrn_cnt); |
| for (i = 0; i < tree->chldrn_cnt; i++) { |
| nodename = pmixp_info_job_host(tree->chldrn_ids[i]); |
| tmp_list = tree->contrib_chld[i] ? |
| &hl_done_contrib : &hl_wait_contrib; |
| |
| if (!*tmp_list) |
| *tmp_list = hostlist_create(nodename); |
| else |
| hostlist_push_host(*tmp_list, nodename); |
| |
| xfree(nodename); |
| } |
| if (hl_done_contrib) { |
| done_contrib = |
| slurm_hostlist_ranged_string_xmalloc( |
| hl_done_contrib); |
| FREE_NULL_HOSTLIST(hl_done_contrib); |
| } |
| |
| if (hl_wait_contrib) { |
| wait_contrib = |
| slurm_hostlist_ranged_string_xmalloc( |
| hl_wait_contrib); |
| FREE_NULL_HOSTLIST(hl_wait_contrib); |
| } |
| PMIXP_ERROR("\t done contrib: %s", |
| done_contrib ? done_contrib : "-"); |
| PMIXP_ERROR("\t wait contrib: %s", |
| wait_contrib ? wait_contrib : "-"); |
| xfree(done_contrib); |
| xfree(wait_contrib); |
| } |
| PMIXP_ERROR("status: coll=%s upfw=%s dfwd=%s", |
| pmixp_coll_tree_state2str(tree->state), |
| pmixp_coll_tree_sndstatus2str(tree->ufwd_status), |
| pmixp_coll_tree_sndstatus2str(tree->dfwd_status)); |
| PMIXP_ERROR("dfwd status: dfwd_cb_cnt=%u, dfwd_cb_wait=%u", |
| tree->dfwd_cb_cnt, tree->dfwd_cb_wait); |
| PMIXP_ERROR("bufs (offset/size): upfw %u/%u, dfwd %u/%u", |
| get_buf_offset(tree->ufwd_buf), size_buf(tree->ufwd_buf), |
| get_buf_offset(tree->dfwd_buf), size_buf(tree->dfwd_buf)); |
| } |