blob: 3f2707a1f5de4826c6b802af624815f3968e9333 [file] [log] [blame] [edit]
/*****************************************************************************\
* msg_aggr.c - Message Aggregator for sending messages to the
* slurmctld, if a reply is expected this also will wait
* and get that reply when received.
*****************************************************************************
* Copyright (C) 2015 Bull S. A. S.
* Bull, Rue Jean Jaures, B.P.68, 78340, Les Clayes-sous-Bois.
* Copyright (C) 2015 SchedMD LLC.
* Written by Martin Perry <martin.perry@bull.com>
* Danny Auble <da@schedmd.com>
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include <pthread.h>
#include "slurm/slurm.h"
#include "src/common/macros.h"
#include "src/common/msg_aggr.h"
#include "src/common/read_config.h"
#include "src/common/slurm_auth.h"
#include "src/common/slurm_route.h"
#include "src/common/slurm_protocol_interface.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#include "src/slurmd/slurmd/slurmd.h"
typedef struct {
pthread_mutex_t aggr_mutex;
pthread_cond_t cond;
uint32_t debug_flags;
bool max_msgs;
uint64_t max_msg_cnt;
List msg_aggr_list;
List msg_list;
pthread_mutex_t mutex;
slurm_addr_t node_addr;
bool running;
pthread_t thread_id;
uint64_t window;
} msg_collection_type_t;
typedef struct {
uint16_t msg_index;
void (*resp_callback) (slurm_msg_t *msg);
pthread_cond_t wait_cond;
} msg_aggr_t;
/*
* Message collection data & controls
*/
static msg_collection_type_t msg_collection;
static void _msg_aggr_free(void *x)
{
msg_aggr_t *object = (msg_aggr_t *)x;
if (object) {
slurm_cond_destroy(&object->wait_cond);
xfree(object);
}
}
static msg_aggr_t *_handle_msg_aggr_ret(uint32_t msg_index, bool locked)
{
msg_aggr_t *msg_aggr;
ListIterator itr;
if (!locked)
slurm_mutex_lock(&msg_collection.aggr_mutex);
itr = list_iterator_create(msg_collection.msg_aggr_list);
while ((msg_aggr = list_next(itr))) {
/* just remove them all */
if (!msg_index) {
/* make sure we don't wait any longer */
slurm_cond_signal(&msg_aggr->wait_cond);
list_remove(itr);
} else if (msg_aggr->msg_index == msg_index) {
list_remove(itr);
break;
}
}
list_iterator_destroy(itr);
if (!locked)
slurm_mutex_unlock(&msg_collection.aggr_mutex);
return msg_aggr;
}
static int _send_to_backup_collector(slurm_msg_t *msg, int rc)
{
slurm_addr_t *next_dest = NULL;
if (msg_collection.debug_flags & DEBUG_FLAG_ROUTE) {
info("%s: primary %s, getting backup", __func__,
rc ? "can't be reached" : "is null");
}
if ((next_dest = route_g_next_collector_backup())) {
int rc2 = SLURM_SUCCESS;
if (msg_collection.debug_flags & DEBUG_FLAG_ROUTE) {
char addrbuf[100];
slurm_print_slurm_addr(next_dest, addrbuf, 32);
info("%s: *next_dest is %s", __func__, addrbuf);
}
memcpy(&msg->address, next_dest, sizeof(slurm_addr_t));
rc = slurm_send_recv_rc_msg_only_one(msg, &rc2, 0);
if (rc2 != SLURM_SUCCESS && !rc)
rc = rc2;
}
if (!next_dest || (rc != SLURM_SUCCESS)) {
int rc2 = SLURM_SUCCESS;
if (msg_collection.debug_flags & DEBUG_FLAG_ROUTE)
info("%s: backup %s, sending msg to controller",
__func__, rc ? "can't be reached" : "is null");
rc = slurm_send_recv_controller_rc_msg(msg, &rc2,
working_cluster_rec);
if (rc2 != SLURM_SUCCESS && !rc)
rc = rc2;
}
return rc;
}
/*
* Send a msg to the next msg aggregation collector node. If primary
* collector is unavailable or returns error, try backup collector.
* If backup collector is unavailable or returns error, send msg
* directly to controller.
*/
static int _send_to_next_collector(slurm_msg_t *msg)
{
slurm_addr_t *next_dest = NULL;
bool i_am_collector;
int rc = SLURM_SUCCESS;
int rc2 = SLURM_SUCCESS;
if (msg_collection.debug_flags & DEBUG_FLAG_ROUTE)
info("msg aggr: send_to_next_collector: getting primary next "
"collector");
if ((next_dest = route_g_next_collector(&i_am_collector))) {
if (msg_collection.debug_flags & DEBUG_FLAG_ROUTE) {
char addrbuf[100];
slurm_print_slurm_addr(next_dest, addrbuf, 32);
info("msg aggr: send_to_next_collector: *next_dest is "
"%s", addrbuf);
}
memcpy(&msg->address, next_dest, sizeof(slurm_addr_t));
rc = slurm_send_recv_rc_msg_only_one(msg, &rc2, 0);
if (rc2 != SLURM_SUCCESS && !rc)
rc = rc2;
}
if (!next_dest || (rc != SLURM_SUCCESS))
rc = _send_to_backup_collector(msg, rc);
return rc;
}
/*
* _msg_aggregation_sender()
*
* Start and terminate message collection windows.
* Send collected msgs to next collector node or final destination
* at window expiration.
*/
static void * _msg_aggregation_sender(void *arg)
{
struct timeval now;
struct timespec timeout;
slurm_msg_t msg;
composite_msg_t cmp;
slurm_mutex_lock(&msg_collection.mutex);
msg_collection.running = 1;
slurm_cond_signal(&msg_collection.cond);
while (msg_collection.running) {
/* Wait for a new msg to be collected */
slurm_cond_wait(&msg_collection.cond, &msg_collection.mutex);
if (!msg_collection.running &&
!list_count(msg_collection.msg_list))
break;
/* A msg has been collected; start new window */
gettimeofday(&now, NULL);
timeout.tv_sec = now.tv_sec +
(msg_collection.window / MSEC_IN_SEC);
timeout.tv_nsec = (now.tv_usec * NSEC_IN_USEC) +
(NSEC_IN_MSEC * (msg_collection.window % MSEC_IN_SEC));
timeout.tv_sec += timeout.tv_nsec / NSEC_IN_SEC;
timeout.tv_nsec %= NSEC_IN_SEC;
slurm_cond_timedwait(&msg_collection.cond,
&msg_collection.mutex, &timeout);
if (!msg_collection.running &&
!list_count(msg_collection.msg_list))
break;
msg_collection.max_msgs = true;
/* Msg collection window has expired and message collection
* is suspended; now build and send composite msg */
memset(&msg, 0, sizeof(slurm_msg_t));
memset(&cmp, 0, sizeof(composite_msg_t));
memcpy(&cmp.sender, &msg_collection.node_addr,
sizeof(slurm_addr_t));
cmp.msg_list = msg_collection.msg_list;
msg_collection.msg_list =
list_create(slurm_free_comp_msg_list);
msg_collection.max_msgs = false;
slurm_msg_t_init(&msg);
msg.msg_type = MESSAGE_COMPOSITE;
msg.protocol_version = SLURM_PROTOCOL_VERSION;
msg.data = &cmp;
if (_send_to_next_collector(&msg) != SLURM_SUCCESS) {
error("_msg_aggregation_engine: Unable to send "
"composite msg: %m");
}
FREE_NULL_LIST(cmp.msg_list);
/* Resume message collection */
slurm_cond_broadcast(&msg_collection.cond);
}
slurm_mutex_unlock(&msg_collection.mutex);
return NULL;
}
extern void msg_aggr_sender_init(char *host, uint16_t port, uint64_t window,
uint64_t max_msg_cnt)
{
if (msg_collection.running || (max_msg_cnt <= 1))
return;
memset(&msg_collection, 0, sizeof(msg_collection_type_t));
slurm_mutex_init(&msg_collection.aggr_mutex);
slurm_mutex_init(&msg_collection.mutex);
slurm_mutex_lock(&msg_collection.mutex);
slurm_mutex_lock(&msg_collection.aggr_mutex);
slurm_cond_init(&msg_collection.cond, NULL);
slurm_set_addr(&msg_collection.node_addr, port, host);
msg_collection.window = window;
msg_collection.max_msg_cnt = max_msg_cnt;
msg_collection.msg_aggr_list = list_create(_msg_aggr_free);
msg_collection.msg_list = list_create(slurm_free_comp_msg_list);
msg_collection.max_msgs = false;
msg_collection.debug_flags = slurm_get_debug_flags();
slurm_mutex_unlock(&msg_collection.aggr_mutex);
slurm_thread_create(&msg_collection.thread_id,
&_msg_aggregation_sender, NULL);
/* wait for thread to start */
slurm_cond_wait(&msg_collection.cond, &msg_collection.mutex);
slurm_mutex_unlock(&msg_collection.mutex);
}
extern void msg_aggr_sender_reconfig(uint64_t window, uint64_t max_msg_cnt)
{
if (msg_collection.running) {
slurm_mutex_lock(&msg_collection.mutex);
msg_collection.window = window;
msg_collection.max_msg_cnt = max_msg_cnt;
msg_collection.debug_flags = slurm_get_debug_flags();
slurm_mutex_unlock(&msg_collection.mutex);
} else if (max_msg_cnt > 1) {
error("can't start the msg_aggr on a reconfig, "
"a restart is needed");
}
}
extern void msg_aggr_sender_fini(void)
{
if (!msg_collection.running)
return;
msg_collection.running = 0;
slurm_mutex_lock(&msg_collection.mutex);
slurm_cond_signal(&msg_collection.cond);
slurm_mutex_unlock(&msg_collection.mutex);
pthread_join(msg_collection.thread_id, NULL);
msg_collection.thread_id = (pthread_t) 0;
slurm_cond_destroy(&msg_collection.cond);
/* signal and clear the waiting list */
slurm_mutex_lock(&msg_collection.aggr_mutex);
_handle_msg_aggr_ret(0, 1);
FREE_NULL_LIST(msg_collection.msg_aggr_list);
slurm_mutex_unlock(&msg_collection.aggr_mutex);
FREE_NULL_LIST(msg_collection.msg_list);
slurm_mutex_destroy(&msg_collection.mutex);
}
extern void msg_aggr_add_msg(slurm_msg_t *msg, bool wait,
void (*resp_callback) (slurm_msg_t *msg))
{
int count;
static uint16_t msg_index = 1;
static uint32_t wait_count = 0;
if (!msg_collection.running)
return;
slurm_mutex_lock(&msg_collection.mutex);
if (msg_collection.max_msgs == true) {
slurm_cond_wait(&msg_collection.cond, &msg_collection.mutex);
}
msg->msg_index = msg_index++;
/* Add msg to message collection */
list_append(msg_collection.msg_list, msg);
count = list_count(msg_collection.msg_list);
/* First msg in collection; initiate new window */
if (count == 1)
slurm_cond_signal(&msg_collection.cond);
/* Max msgs reached; terminate window */
if (count >= msg_collection.max_msg_cnt) {
msg_collection.max_msgs = true;
slurm_cond_signal(&msg_collection.cond);
}
slurm_mutex_unlock(&msg_collection.mutex);
if (wait) {
msg_aggr_t *msg_aggr = xmalloc(sizeof(msg_aggr_t));
uint16_t msg_timeout;
struct timeval now;
struct timespec timeout;
msg_aggr->msg_index = msg->msg_index;
msg_aggr->resp_callback = resp_callback;
slurm_cond_init(&msg_aggr->wait_cond, NULL);
slurm_mutex_lock(&msg_collection.aggr_mutex);
list_append(msg_collection.msg_aggr_list, msg_aggr);
msg_timeout = slurm_get_msg_timeout();
gettimeofday(&now, NULL);
timeout.tv_sec = now.tv_sec + msg_timeout;
timeout.tv_nsec = now.tv_usec * 1000;
wait_count++;
if (pthread_cond_timedwait(&msg_aggr->wait_cond,
&msg_collection.aggr_mutex,
&timeout) == ETIMEDOUT)
_handle_msg_aggr_ret(msg_aggr->msg_index, 1);
wait_count--;
slurm_mutex_unlock(&msg_collection.aggr_mutex);
if (!msg_collection.running && !wait_count)
slurm_mutex_destroy(&msg_collection.aggr_mutex);
_msg_aggr_free(msg_aggr);
}
}
extern void msg_aggr_add_comp(Buf buffer, void *auth_cred, header_t *header)
{
slurm_msg_t *msg;
if (!msg_collection.running)
return;
msg = xmalloc_nz(sizeof(slurm_msg_t));
slurm_msg_t_init(msg);
msg->protocol_version = header->version;
msg->msg_type = header->msg_type;
msg->flags = header->flags;
msg->auth_cred = auth_cred;
msg->data = buffer;
msg->data_size = remaining_buf(buffer);
msg_aggr_add_msg(msg, 0, NULL);
}
extern void msg_aggr_resp(slurm_msg_t *msg)
{
slurm_msg_t *next_msg;
composite_msg_t *comp_msg;
msg_aggr_t *msg_aggr;
ListIterator itr;
comp_msg = (composite_msg_t *)msg->data;
itr = list_iterator_create(comp_msg->msg_list);
if (msg_collection.debug_flags & DEBUG_FLAG_ROUTE)
info("msg_aggr_resp: processing composite msg_list...");
while ((next_msg = list_next(itr))) {
switch (next_msg->msg_type) {
case RESPONSE_NODE_REGISTRATION:
case RESPONSE_SLURM_RC:
/* signal sending thread that slurmctld received this
* msg */
if (msg_collection.debug_flags & DEBUG_FLAG_ROUTE)
info("msg_aggr_resp: response found for "
"index %u signaling sending thread",
next_msg->msg_index);
slurm_mutex_lock(&msg_collection.aggr_mutex);
if (!(msg_aggr = _handle_msg_aggr_ret(
next_msg->msg_index, 1))) {
debug2("msg_aggr_resp: error: unable to "
"locate aggr message struct for job %u",
next_msg->msg_index);
slurm_mutex_unlock(&msg_collection.aggr_mutex);
continue;
}
if (msg_aggr->resp_callback &&
(next_msg->msg_type != RESPONSE_SLURM_RC))
(*(msg_aggr->resp_callback))(next_msg);
slurm_cond_signal(&msg_aggr->wait_cond);
slurm_mutex_unlock(&msg_collection.aggr_mutex);
break;
case RESPONSE_MESSAGE_COMPOSITE:
comp_msg = (composite_msg_t *)next_msg->data;
/* set up the address here for the next node */
memcpy(&next_msg->address, &comp_msg->sender,
sizeof(slurm_addr_t));
if (msg_collection.debug_flags & DEBUG_FLAG_ROUTE) {
char addrbuf[100];
slurm_print_slurm_addr(&next_msg->address,
addrbuf, 32);
info("msg_aggr_resp: composite response msg "
"found for %s", addrbuf);
}
slurm_send_only_node_msg(next_msg);
break;
default:
error("_rpc_composite_resp: invalid msg type in "
"composite msg_list");
break;
}
}
list_iterator_destroy(itr);
if (msg_collection.debug_flags & DEBUG_FLAG_ROUTE)
info("msg aggr: _rpc_composite_resp: finished processing "
"composite msg_list...");
}