| /*****************************************************************************\ |
| * forward.c - forward RPCs through hierarchical slurmd communications |
| ***************************************************************************** |
| * Copyright (C) 2006-2007 The Regents of the University of California. |
| * Copyright (C) 2008 Lawrence Livermore National Security. |
| * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). |
| * Written by Danny Auble <auble1@llnl.gov>. |
| * CODE-OCEC-09-009. All rights reserved. |
| * |
| * This file is part of SLURM, a resource management program. |
| * For details, see <http://slurm.schedmd.com/>. |
| * Please also read the included file: DISCLAIMER. |
| * |
| * SLURM is free software; you can redistribute it and/or modify it under |
| * the terms of the GNU General Public License as published by the Free |
| * Software Foundation; either version 2 of the License, or (at your option) |
| * any later version. |
| * |
| * In addition, as a special exception, the copyright holders give permission |
| * to link the code of portions of this program with the OpenSSL library under |
| * certain conditions as described in each individual source file, and |
| * distribute linked combinations including the two. You must obey the GNU |
| * General Public License in all respects for all of the code used other than |
| * OpenSSL. If you modify file(s) with this exception, you may extend this |
| * exception to your version of the file(s), but you are not obligated to do |
| * so. If you do not wish to do so, delete this exception statement from your |
| * version. If you delete this exception statement from all source files in |
| * the program, then also delete it here. |
| * |
| * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY |
| * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
| * details. |
| * |
| * You should have received a copy of the GNU General Public License along |
| * with SLURM; if not, write to the Free Software Foundation, Inc., |
| * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| \*****************************************************************************/ |
| |
| #ifdef HAVE_CONFIG_H |
| # include "config.h" |
| #endif |
| |
| #include <errno.h> |
| #include <stdio.h> |
| #include <unistd.h> |
| #include <stdlib.h> |
| #include <sys/types.h> |
| |
| #include "slurm/slurm.h" |
| |
| #include "src/common/forward.h" |
| #include "src/common/xmalloc.h" |
| #include "src/common/xstring.h" |
| #include "src/common/slurm_auth.h" |
| #include "src/common/read_config.h" |
| #include "src/common/slurm_protocol_interface.h" |
| |
| #ifdef WITH_PTHREADS |
| # include <pthread.h> |
| #endif /* WITH_PTHREADS */ |
| |
| #define MAX_RETRIES 3 |
| |
| typedef struct { |
| pthread_cond_t *notify; |
| int *p_thr_count; |
| slurm_msg_t *orig_msg; |
| List ret_list; |
| int timeout; |
| hostlist_t tree_hl; |
| pthread_mutex_t *tree_mutex; |
| } fwd_tree_t; |
| |
| void _destroy_tree_fwd(fwd_tree_t *fwd_tree) |
| { |
| if (fwd_tree) { |
| if (fwd_tree->tree_hl) |
| hostlist_destroy(fwd_tree->tree_hl); |
| |
| /* |
| * Lock and decrease thread counter, start_msg_tree is waiting |
| * for a null thread count to exit its main loop |
| */ |
| slurm_mutex_lock(fwd_tree->tree_mutex); |
| (*(fwd_tree->p_thr_count))--; |
| pthread_cond_signal(fwd_tree->notify); |
| slurm_mutex_unlock(fwd_tree->tree_mutex); |
| |
| xfree(fwd_tree); |
| } |
| } |
| |
| void *_forward_thread(void *arg) |
| { |
| forward_msg_t *fwd_msg = (forward_msg_t *)arg; |
| Buf buffer = init_buf(fwd_msg->buf_len); |
| List ret_list = NULL; |
| slurm_fd_t fd = -1; |
| ret_data_info_t *ret_data_info = NULL; |
| char *name = NULL; |
| hostlist_t hl = hostlist_create(fwd_msg->header.forward.nodelist); |
| slurm_addr_t addr; |
| char *buf = NULL; |
| int steps = 0; |
| int start_timeout = fwd_msg->timeout; |
| |
| /* repeat until we are sure the message was sent */ |
| while ((name = hostlist_shift(hl))) { |
| if (slurm_conf_get_addr(name, &addr) == SLURM_ERROR) { |
| error("forward_thread: can't find address for host " |
| "%s, check slurm.conf", name); |
| slurm_mutex_lock(fwd_msg->forward_mutex); |
| mark_as_failed_forward(&fwd_msg->ret_list, name, |
| SLURM_UNKNOWN_FORWARD_ADDR); |
| free(name); |
| if (hostlist_count(hl) > 0) { |
| slurm_mutex_unlock(fwd_msg->forward_mutex); |
| continue; |
| } |
| goto cleanup; |
| } |
| if ((fd = slurm_open_msg_conn(&addr)) < 0) { |
| error("forward_thread to %s: %m", name); |
| |
| slurm_mutex_lock(fwd_msg->forward_mutex); |
| mark_as_failed_forward( |
| &fwd_msg->ret_list, name, |
| SLURM_COMMUNICATIONS_CONNECTION_ERROR); |
| free(name); |
| if (hostlist_count(hl) > 0) { |
| slurm_mutex_unlock(fwd_msg->forward_mutex); |
| continue; |
| } |
| goto cleanup; |
| } |
| buf = hostlist_ranged_string_xmalloc(hl); |
| |
| xfree(fwd_msg->header.forward.nodelist); |
| fwd_msg->header.forward.nodelist = buf; |
| fwd_msg->header.forward.cnt = hostlist_count(hl); |
| /* info("sending %d forwards (%s) to %s", */ |
| /* fwd_msg->header.forward.cnt, */ |
| /* fwd_msg->header.forward.nodelist, name); */ |
| if (fwd_msg->header.forward.nodelist[0]) { |
| debug3("forward: send to %s along with %s", |
| name, fwd_msg->header.forward.nodelist); |
| } else |
| debug3("forward: send to %s ", name); |
| |
| pack_header(&fwd_msg->header, buffer); |
| |
| /* add forward data to buffer */ |
| if (remaining_buf(buffer) < fwd_msg->buf_len) { |
| buffer->size += (fwd_msg->buf_len + BUF_SIZE); |
| xrealloc(buffer->head, buffer->size); |
| } |
| if (fwd_msg->buf_len) { |
| memcpy(&buffer->head[buffer->processed], |
| fwd_msg->buf, fwd_msg->buf_len); |
| buffer->processed += fwd_msg->buf_len; |
| } |
| |
| /* |
| * forward message |
| */ |
| if (_slurm_msg_sendto(fd, |
| get_buf_data(buffer), |
| get_buf_offset(buffer), |
| SLURM_PROTOCOL_NO_SEND_RECV_FLAGS ) < 0) { |
| error("forward_thread: slurm_msg_sendto: %m"); |
| |
| slurm_mutex_lock(fwd_msg->forward_mutex); |
| mark_as_failed_forward(&fwd_msg->ret_list, name, |
| errno); |
| free(name); |
| if (hostlist_count(hl) > 0) { |
| free_buf(buffer); |
| buffer = init_buf(fwd_msg->buf_len); |
| slurm_mutex_unlock(fwd_msg->forward_mutex); |
| slurm_close_accepted_conn(fd); |
| fd = -1; |
| continue; |
| } |
| goto cleanup; |
| } |
| |
| if ((fwd_msg->header.msg_type == REQUEST_SHUTDOWN) || |
| (fwd_msg->header.msg_type == REQUEST_RECONFIGURE) || |
| (fwd_msg->header.msg_type == REQUEST_REBOOT_NODES)) { |
| slurm_mutex_lock(fwd_msg->forward_mutex); |
| ret_data_info = xmalloc(sizeof(ret_data_info_t)); |
| list_push(fwd_msg->ret_list, ret_data_info); |
| ret_data_info->node_name = xstrdup(name); |
| free(name); |
| while ((name = hostlist_shift(hl))) { |
| ret_data_info = |
| xmalloc(sizeof(ret_data_info_t)); |
| list_push(fwd_msg->ret_list, ret_data_info); |
| ret_data_info->node_name = xstrdup(name); |
| free(name); |
| } |
| goto cleanup; |
| } |
| |
| if (fwd_msg->header.forward.cnt > 0) { |
| static int message_timeout = -1; |
| if (message_timeout < 0) |
| message_timeout = |
| slurm_get_msg_timeout() * 1000; |
| steps = (fwd_msg->header.forward.cnt+1) / |
| slurm_get_tree_width(); |
| fwd_msg->timeout = (message_timeout*steps); |
| /* info("got %d * %d = %d", message_timeout, steps, fwd_msg->timeout); */ |
| steps++; |
| fwd_msg->timeout += (start_timeout*steps); |
| /* info("now + %d*%d = %d", start_timeout, steps, fwd_msg->timeout); */ |
| } |
| |
| ret_list = slurm_receive_msgs(fd, steps, fwd_msg->timeout); |
| /* info("sent %d forwards got %d back", */ |
| /* fwd_msg->header.forward.cnt, list_count(ret_list)); */ |
| |
| if (!ret_list || (fwd_msg->header.forward.cnt != 0 |
| && list_count(ret_list) <= 1)) { |
| slurm_mutex_lock(fwd_msg->forward_mutex); |
| mark_as_failed_forward(&fwd_msg->ret_list, name, |
| errno); |
| free(name); |
| if (ret_list) |
| list_destroy(ret_list); |
| if (hostlist_count(hl) > 0) { |
| free_buf(buffer); |
| buffer = init_buf(fwd_msg->buf_len); |
| slurm_mutex_unlock(fwd_msg->forward_mutex); |
| slurm_close_accepted_conn(fd); |
| fd = -1; |
| continue; |
| } |
| goto cleanup; |
| } else if ((fwd_msg->header.forward.cnt+1) |
| != list_count(ret_list)) { |
| /* this should never be called since the above |
| should catch the failed forwards and pipe |
| them back down, but this is here so we |
| never have to worry about a locked |
| mutex */ |
| ListIterator itr = NULL; |
| char *tmp = NULL; |
| int first_node_found = 0; |
| hostlist_iterator_t host_itr |
| = hostlist_iterator_create(hl); |
| error("We shouldn't be here. We forwarded to %d " |
| "but only got %d back", |
| (fwd_msg->header.forward.cnt+1), |
| list_count(ret_list)); |
| while ((tmp = hostlist_next(host_itr))) { |
| int node_found = 0; |
| itr = list_iterator_create(ret_list); |
| while ((ret_data_info = list_next(itr))) { |
| if (!ret_data_info->node_name) { |
| first_node_found = 1; |
| ret_data_info->node_name = |
| xstrdup(name); |
| } |
| if (!strcmp(tmp, |
| ret_data_info->node_name)) { |
| node_found = 1; |
| break; |
| } |
| } |
| list_iterator_destroy(itr); |
| if (!node_found) { |
| mark_as_failed_forward( |
| &fwd_msg->ret_list, |
| tmp, |
| SLURM_COMMUNICATIONS_CONNECTION_ERROR); |
| } |
| free(tmp); |
| } |
| hostlist_iterator_destroy(host_itr); |
| if (!first_node_found) { |
| mark_as_failed_forward(&fwd_msg->ret_list, |
| name, |
| SLURM_COMMUNICATIONS_CONNECTION_ERROR); |
| } |
| } |
| break; |
| } |
| slurm_mutex_lock(fwd_msg->forward_mutex); |
| if (ret_list) { |
| while ((ret_data_info = list_pop(ret_list)) != NULL) { |
| if (!ret_data_info->node_name) { |
| ret_data_info->node_name = xstrdup(name); |
| } |
| list_push(fwd_msg->ret_list, ret_data_info); |
| debug3("got response from %s", |
| ret_data_info->node_name); |
| } |
| list_destroy(ret_list); |
| } |
| free(name); |
| cleanup: |
| if ((fd >= 0) && slurm_close_accepted_conn(fd) < 0) |
| error ("close(%d): %m", fd); |
| hostlist_destroy(hl); |
| destroy_forward(&fwd_msg->header.forward); |
| free_buf(buffer); |
| pthread_cond_signal(fwd_msg->notify); |
| slurm_mutex_unlock(fwd_msg->forward_mutex); |
| |
| return (NULL); |
| } |
| |
| void *_fwd_tree_thread(void *arg) |
| { |
| fwd_tree_t *fwd_tree = (fwd_tree_t *)arg; |
| List ret_list = NULL; |
| char *name = NULL; |
| char *buf = NULL; |
| slurm_msg_t send_msg; |
| |
| slurm_msg_t_init(&send_msg); |
| send_msg.msg_type = fwd_tree->orig_msg->msg_type; |
| send_msg.data = fwd_tree->orig_msg->data; |
| send_msg.protocol_version = fwd_tree->orig_msg->protocol_version; |
| |
| /* repeat until we are sure the message was sent */ |
| while ((name = hostlist_shift(fwd_tree->tree_hl))) { |
| if (slurm_conf_get_addr(name, &send_msg.address) |
| == SLURM_ERROR) { |
| error("fwd_tree_thread: can't find address for host " |
| "%s, check slurm.conf", name); |
| slurm_mutex_lock(fwd_tree->tree_mutex); |
| mark_as_failed_forward(&fwd_tree->ret_list, name, |
| SLURM_UNKNOWN_FORWARD_ADDR); |
| pthread_cond_signal(fwd_tree->notify); |
| slurm_mutex_unlock(fwd_tree->tree_mutex); |
| free(name); |
| |
| continue; |
| } |
| |
| send_msg.forward.timeout = fwd_tree->timeout; |
| if ((send_msg.forward.cnt = hostlist_count(fwd_tree->tree_hl))){ |
| buf = hostlist_ranged_string_xmalloc( |
| fwd_tree->tree_hl); |
| send_msg.forward.nodelist = buf; |
| } else |
| send_msg.forward.nodelist = NULL; |
| |
| if (send_msg.forward.nodelist && send_msg.forward.nodelist[0]) { |
| debug3("Tree sending to %s along with %s", |
| name, send_msg.forward.nodelist); |
| } else |
| debug3("Tree sending to %s", name); |
| |
| ret_list = slurm_send_addr_recv_msgs(&send_msg, name, |
| fwd_tree->timeout); |
| |
| xfree(send_msg.forward.nodelist); |
| |
| if (ret_list) { |
| int ret_cnt = list_count(ret_list); |
| /* This is most common if a slurmd is running |
| an older version of Slurm than the |
| originator of the message. |
| */ |
| if ((ret_cnt <= send_msg.forward.cnt) && |
| (errno != SLURM_COMMUNICATIONS_CONNECTION_ERROR)) { |
| error("fwd_tree_thread: %s failed to forward " |
| "the message, expecting %d ret got only " |
| "%d", |
| name, send_msg.forward.cnt + 1, ret_cnt); |
| if (ret_cnt > 1) { /* not likely */ |
| ret_data_info_t *ret_data_info = NULL; |
| ListIterator itr = |
| list_iterator_create(ret_list); |
| while ((ret_data_info = |
| list_next(itr))) { |
| if (strcmp(ret_data_info-> |
| node_name, name)) |
| hostlist_delete_host( |
| fwd_tree-> |
| tree_hl, |
| ret_data_info-> |
| node_name); |
| } |
| list_iterator_destroy(itr); |
| } |
| } |
| |
| slurm_mutex_lock(fwd_tree->tree_mutex); |
| list_transfer(fwd_tree->ret_list, ret_list); |
| pthread_cond_signal(fwd_tree->notify); |
| slurm_mutex_unlock(fwd_tree->tree_mutex); |
| list_destroy(ret_list); |
| /* try next node */ |
| if (ret_cnt <= send_msg.forward.cnt) { |
| free(name); |
| continue; |
| } |
| } else { |
| /* This should never happen (when this was |
| * written slurm_send_addr_recv_msgs always |
| * returned a list */ |
| error("fwd_tree_thread: no return list given from " |
| "slurm_send_addr_recv_msgs spawned for %s", |
| name); |
| slurm_mutex_lock(fwd_tree->tree_mutex); |
| mark_as_failed_forward( |
| &fwd_tree->ret_list, name, |
| SLURM_COMMUNICATIONS_CONNECTION_ERROR); |
| pthread_cond_signal(fwd_tree->notify); |
| slurm_mutex_unlock(fwd_tree->tree_mutex); |
| free(name); |
| |
| continue; |
| } |
| |
| free(name); |
| |
| /* check for error and try again */ |
| if (errno == SLURM_COMMUNICATIONS_CONNECTION_ERROR) |
| continue; |
| |
| break; |
| } |
| |
| _destroy_tree_fwd(fwd_tree); |
| |
| return NULL; |
| } |
| |
| /* |
| * forward_init - initilize forward structure |
| * IN: forward - forward_t * - struct to store forward info |
| * IN: from - forward_t * - (OPTIONAL) can be NULL, can be used to |
| * init the forward to this state |
| * RET: VOID |
| */ |
| extern void forward_init(forward_t *forward, forward_t *from) |
| { |
| if (from && from->init == FORWARD_INIT) { |
| forward->cnt = from->cnt; |
| forward->timeout = from->timeout; |
| forward->nodelist = from->nodelist; |
| forward->init = from->init; |
| } else { |
| forward->cnt = 0; |
| forward->timeout = 0; |
| forward->nodelist = NULL; |
| forward->init = FORWARD_INIT; |
| } |
| } |
| |
| /* |
| * forward_msg - logic to forward a message which has been received and |
| * accumulate the return codes from processes getting the |
| * the forwarded message |
| * |
| * IN: forward_struct - forward_struct_t * - holds information about message |
| * that needs to be forwarded to |
| * children processes |
| * IN: header - header_t - header from message that came in |
| * needing to be forwarded. |
| * RET: SLURM_SUCCESS - int |
| */ |
| extern int forward_msg(forward_struct_t *forward_struct, |
| header_t *header) |
| { |
| int j = 0; |
| int retries = 0; |
| forward_msg_t *forward_msg = NULL; |
| int thr_count = 0; |
| int *span = set_span(header->forward.cnt, 0); |
| hostlist_t hl = NULL; |
| hostlist_t forward_hl = NULL; |
| char *name = NULL; |
| |
| if (!forward_struct->ret_list) { |
| error("didn't get a ret_list from forward_struct"); |
| xfree(span); |
| return SLURM_ERROR; |
| } |
| hl = hostlist_create(header->forward.nodelist); |
| hostlist_uniq(hl); |
| |
| while ((name = hostlist_shift(hl))) { |
| pthread_attr_t attr_agent; |
| pthread_t thread_agent; |
| char *buf = NULL; |
| |
| slurm_attr_init(&attr_agent); |
| if (pthread_attr_setdetachstate |
| (&attr_agent, PTHREAD_CREATE_DETACHED)) |
| error("pthread_attr_setdetachstate error %m"); |
| |
| forward_msg = &forward_struct->forward_msg[thr_count]; |
| forward_msg->ret_list = forward_struct->ret_list; |
| |
| forward_msg->timeout = forward_struct->timeout; |
| |
| if (forward_msg->timeout <= 0) { |
| /* convert secs to msec */ |
| forward_msg->timeout = slurm_get_msg_timeout() * 1000; |
| } |
| |
| forward_msg->notify = &forward_struct->notify; |
| forward_msg->forward_mutex = &forward_struct->forward_mutex; |
| forward_msg->buf_len = forward_struct->buf_len; |
| forward_msg->buf = forward_struct->buf; |
| |
| memcpy(&forward_msg->header.orig_addr, |
| &header->orig_addr, |
| sizeof(slurm_addr_t)); |
| |
| forward_msg->header.version = header->version; |
| forward_msg->header.flags = header->flags; |
| forward_msg->header.msg_type = header->msg_type; |
| forward_msg->header.body_length = header->body_length; |
| forward_msg->header.ret_list = NULL; |
| forward_msg->header.ret_cnt = 0; |
| |
| forward_hl = hostlist_create(name); |
| free(name); |
| for(j = 0; j < span[thr_count]; j++) { |
| name = hostlist_shift(hl); |
| if (!name) |
| break; |
| hostlist_push_host(forward_hl, name); |
| free(name); |
| } |
| |
| buf = hostlist_ranged_string_xmalloc(forward_hl); |
| hostlist_destroy(forward_hl); |
| forward_init(&forward_msg->header.forward, NULL); |
| forward_msg->header.forward.nodelist = buf; |
| while (pthread_create(&thread_agent, &attr_agent, |
| _forward_thread, |
| (void *)forward_msg)) { |
| error("pthread_create error %m"); |
| if (++retries > MAX_RETRIES) |
| fatal("Can't create pthread"); |
| usleep(100000); /* sleep and try again */ |
| } |
| slurm_attr_destroy(&attr_agent); |
| thr_count++; |
| } |
| hostlist_destroy(hl); |
| xfree(span); |
| return SLURM_SUCCESS; |
| } |
| |
| /* |
| * start_msg_tree - logic to begin the forward tree and |
| * accumulate the return codes from processes getting the |
| * the forwarded message |
| * |
| * IN: hl - hostlist_t - list of every node to send message to |
| * IN: msg - slurm_msg_t - message to send. |
| * IN: timeout - int - how long to wait in milliseconds. |
| * RET List - List containing the responses of the children |
| * (if any) we forwarded the message to. List |
| * containing type (ret_data_info_t). |
| */ |
| extern List start_msg_tree(hostlist_t hl, slurm_msg_t *msg, int timeout) |
| { |
| int *span = NULL; |
| fwd_tree_t *fwd_tree = NULL; |
| pthread_mutex_t tree_mutex; |
| pthread_cond_t notify; |
| int j = 0, count = 0; |
| List ret_list = NULL; |
| char *name = NULL; |
| int thr_count = 0; |
| int host_count = 0; |
| |
| xassert(hl); |
| xassert(msg); |
| |
| hostlist_uniq(hl); |
| host_count = hostlist_count(hl); |
| |
| span = set_span(host_count, 0); |
| |
| slurm_mutex_init(&tree_mutex); |
| pthread_cond_init(¬ify, NULL); |
| |
| ret_list = list_create(destroy_data_info); |
| |
| while ((name = hostlist_shift(hl))) { |
| pthread_attr_t attr_agent; |
| pthread_t thread_agent; |
| int retries = 0; |
| |
| slurm_attr_init(&attr_agent); |
| if (pthread_attr_setdetachstate |
| (&attr_agent, PTHREAD_CREATE_DETACHED)) |
| error("pthread_attr_setdetachstate error %m"); |
| |
| fwd_tree = xmalloc(sizeof(fwd_tree_t)); |
| fwd_tree->orig_msg = msg; |
| fwd_tree->ret_list = ret_list; |
| fwd_tree->timeout = timeout; |
| fwd_tree->notify = ¬ify; |
| fwd_tree->p_thr_count = &thr_count; |
| fwd_tree->tree_mutex = &tree_mutex; |
| |
| if (fwd_tree->timeout <= 0) { |
| /* convert secs to msec */ |
| fwd_tree->timeout = slurm_get_msg_timeout() * 1000; |
| } |
| |
| fwd_tree->tree_hl = hostlist_create(name); |
| free(name); |
| for (j = 0; j < span[thr_count]; j++) { |
| name = hostlist_shift(hl); |
| if (!name) |
| break; |
| hostlist_push_host(fwd_tree->tree_hl, name); |
| free(name); |
| } |
| |
| /* |
| * Lock and increase thread counter, we need that to protect |
| * the start_msg_tree waiting loop that was originally designed |
| * around a "while ((count < host_count))" loop. In case where a |
| * fwd thread was not able to get all the return codes from |
| * children, the waiting loop was deadlocked. |
| */ |
| slurm_mutex_lock(&tree_mutex); |
| thr_count++; |
| slurm_mutex_unlock(&tree_mutex); |
| |
| while (pthread_create(&thread_agent, &attr_agent, |
| _fwd_tree_thread, (void *)fwd_tree)) { |
| error("pthread_create error %m"); |
| if (++retries > MAX_RETRIES) |
| fatal("Can't create pthread"); |
| usleep(100000); /* sleep and try again */ |
| } |
| slurm_attr_destroy(&attr_agent); |
| |
| } |
| xfree(span); |
| |
| slurm_mutex_lock(&tree_mutex); |
| |
| count = list_count(ret_list); |
| debug2("Tree head got back %d looking for %d", count, host_count); |
| while (thr_count > 0) { |
| pthread_cond_wait(¬ify, &tree_mutex); |
| count = list_count(ret_list); |
| debug2("Tree head got back %d", count); |
| } |
| xassert(count >= host_count); /* Tree head did not get all responses, |
| * but no more active fwd threads!*/ |
| slurm_mutex_unlock(&tree_mutex); |
| |
| slurm_mutex_destroy(&tree_mutex); |
| pthread_cond_destroy(¬ify); |
| |
| return ret_list; |
| } |
| |
| /* |
| * mark_as_failed_forward- mark a node as failed and add it to "ret_list" |
| * |
| * IN: ret_list - List * - ret_list to put ret_data_info |
| * IN: node_name - char * - node name that failed |
| * IN: err - int - error message from attempt |
| * |
| */ |
| extern void mark_as_failed_forward(List *ret_list, char *node_name, int err) |
| { |
| ret_data_info_t *ret_data_info = NULL; |
| |
| debug3("problems with %s", node_name); |
| if (!*ret_list) |
| *ret_list = list_create(destroy_data_info); |
| |
| ret_data_info = xmalloc(sizeof(ret_data_info_t)); |
| ret_data_info->node_name = xstrdup(node_name); |
| ret_data_info->type = RESPONSE_FORWARD_FAILED; |
| ret_data_info->err = err; |
| list_push(*ret_list, ret_data_info); |
| |
| return; |
| } |
| |
| extern void forward_wait(slurm_msg_t * msg) |
| { |
| int count = 0; |
| |
| /* wait for all the other messages on the tree under us */ |
| if (msg->forward_struct) { |
| debug2("looking for %d", msg->forward_struct->fwd_cnt); |
| slurm_mutex_lock(&msg->forward_struct->forward_mutex); |
| count = 0; |
| if (msg->ret_list != NULL) |
| count = list_count(msg->ret_list); |
| |
| debug2("Got back %d", count); |
| while ((count < msg->forward_struct->fwd_cnt)) { |
| pthread_cond_wait(&msg->forward_struct->notify, |
| &msg->forward_struct->forward_mutex); |
| |
| if (msg->ret_list != NULL) { |
| count = list_count(msg->ret_list); |
| } |
| debug2("Got back %d", count); |
| |
| } |
| debug2("Got them all"); |
| slurm_mutex_unlock(&msg->forward_struct->forward_mutex); |
| destroy_forward_struct(msg->forward_struct); |
| msg->forward_struct = NULL; |
| } |
| return; |
| } |
| |
| void destroy_data_info(void *object) |
| { |
| ret_data_info_t *ret_data_info = (ret_data_info_t *)object; |
| if (ret_data_info) { |
| slurm_free_msg_data(ret_data_info->type, |
| ret_data_info->data); |
| xfree(ret_data_info->node_name); |
| xfree(ret_data_info); |
| } |
| } |
| |
| void destroy_forward(forward_t *forward) |
| { |
| if (forward->init == FORWARD_INIT) { |
| xfree(forward->nodelist); |
| forward->init = 0; |
| } else { |
| error("destroy_forward: no init"); |
| } |
| } |
| |
| void destroy_forward_struct(forward_struct_t *forward_struct) |
| { |
| if (forward_struct) { |
| xfree(forward_struct->buf); |
| xfree(forward_struct->forward_msg); |
| slurm_mutex_destroy(&forward_struct->forward_mutex); |
| pthread_cond_destroy(&forward_struct->notify); |
| xfree(forward_struct); |
| } |
| } |