blob: f4354712536e5125868ac6f8a96d63c702de3128 [file] [log] [blame]
/*****************************************************************************\
* pmi_server.c - Global PMI data as maintained within srun
*****************************************************************************
* Copyright (C) 2005-2006 The Regents of the University of California.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Morris Jette <jette1@llnl.gov>
* CODE-OCEC-09-009. All rights reserved.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include <pthread.h>
#include <stdlib.h>
#include "slurm/slurm_errno.h"
#include "src/api/slurm_pmi.h"
#include "src/common/macros.h"
#include "src/common/read_config.h"
#include "src/common/slurm_protocol_api.h"
#include "src/common/slurm_protocol_defs.h"
#include "src/common/timers.h"
#include "src/common/xsignal.h"
#include "src/common/xstring.h"
#include "src/common/xmalloc.h"
#define _DEBUG 0 /* non-zero for extra KVS logging */
#define _DEBUG_TIMING 0 /* non-zero for KVS timing details */
static pthread_mutex_t kvs_mutex = PTHREAD_MUTEX_INITIALIZER;
static int kvs_comm_cnt = 0;
static int kvs_updated = 0;
static struct kvs_comm **kvs_comm_ptr = NULL;
/* Track time to process kvs put requests
* This can be used to tune PMI_TIME environment variable */
static int min_time_kvs_put = 1000000;
static int max_time_kvs_put = 0;
static int tot_time_kvs_put = 0;
/* By default there are no duplicate keys
* allowed by the PMI protocol.
*/
static int pmi_kvs_no_dup_keys = 1;
struct barrier_resp {
uint16_t port;
char *hostname;
}; /* details for barrier task communications */
struct barrier_resp *barrier_ptr = NULL;
uint32_t barrier_resp_cnt = 0; /* tasks having reached barrier */
uint32_t barrier_cnt = 0; /* tasks needing to reach barrier */
pthread_mutex_t agent_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t agent_cond = PTHREAD_COND_INITIALIZER;
struct agent_arg {
struct barrier_resp *barrier_xmit_ptr;
int barrier_xmit_cnt;
struct kvs_comm **kvs_xmit_ptr;
int kvs_xmit_cnt;
}; /* details for message agent manager */
struct msg_arg {
struct barrier_resp *bar_ptr;
kvs_comm_set_t *kvs_ptr;
};
int agent_cnt = 0; /* number of active message agents */
int agent_max_cnt = 32; /* maximum number of active agents */
static void *_agent(void *x);
static struct kvs_comm *_find_kvs_by_name(char *name);
struct kvs_comm **_kvs_comm_dup(void);
static void _kvs_xmit_tasks(void);
static void _merge_named_kvs(struct kvs_comm *kvs_orig,
struct kvs_comm *kvs_new);
static void _move_kvs(struct kvs_comm *kvs_new);
static void *_msg_thread(void *x);
static void _print_kvs(void);
/* Transmit the KVS keypairs to all tasks, waiting at a barrier
* This will take some time, so we work with a copy of the KVS keypairs.
* We also work with a private copy of the barrier data and clear the
* global data pointers so any new barrier requests get treated as
* completely independent of this one. */
static void _kvs_xmit_tasks(void)
{
struct agent_arg *args;
#if _DEBUG
info("All tasks at barrier, transmit KVS keypairs now");
#endif
/* Target KVS_TIME should be about ave processing time */
debug("kvs_put processing time min=%d, max=%d ave=%d (usec)",
min_time_kvs_put, max_time_kvs_put,
(tot_time_kvs_put / barrier_cnt));
min_time_kvs_put = 1000000;
max_time_kvs_put = 0;
tot_time_kvs_put = 0;
/* reset barrier info */
args = xmalloc(sizeof(struct agent_arg));
args->barrier_xmit_ptr = barrier_ptr;
args->barrier_xmit_cnt = barrier_cnt;
barrier_ptr = NULL;
barrier_resp_cnt = 0;
barrier_cnt = 0;
/* copy the new kvs data */
if (kvs_updated) {
args->kvs_xmit_ptr = _kvs_comm_dup();
args->kvs_xmit_cnt = kvs_comm_cnt;
kvs_updated = 0;
} else { /* No new data to transmit */
args->kvs_xmit_ptr = xmalloc(0);
args->kvs_xmit_cnt = 0;
}
/* Spawn a pthread to transmit it */
slurm_thread_create_detached(_agent, args);
}
static void *_msg_thread(void *x)
{
struct msg_arg *msg_arg_ptr = (struct msg_arg *) x;
int rc, timeout;
slurm_msg_t msg_send;
slurm_msg_t_init(&msg_send);
slurm_msg_set_r_uid(&msg_send, SLURM_AUTH_UID_ANY);
debug2("KVS_Barrier msg to %s:%hu",
msg_arg_ptr->bar_ptr->hostname,
msg_arg_ptr->bar_ptr->port);
msg_send.msg_type = PMI_KVS_GET_RESP;
msg_send.data = (void *) msg_arg_ptr->kvs_ptr;
slurm_set_addr(&msg_send.address,
msg_arg_ptr->bar_ptr->port,
msg_arg_ptr->bar_ptr->hostname);
/*
* Multiple jobs and highly parallel jobs using PMI sometimes result in
* slow message responses and timeouts. Raise the default TCPTimeout
* by 10x.
*/
timeout = slurm_conf.msg_timeout * MSEC_IN_SEC * 10;
if (slurm_send_recv_rc_msg_only_one(&msg_send, &rc, timeout) < 0) {
error("slurm_send_recv_rc_msg_only_one to %s:%hu : %m",
msg_arg_ptr->bar_ptr->hostname,
msg_arg_ptr->bar_ptr->port);
} else if (rc != SLURM_SUCCESS) {
error("KVS_Barrier confirm from %s, rc=%d",
msg_arg_ptr->bar_ptr->hostname, rc);
} else {
/* successfully transmitted KVS keypairs */
}
slurm_mutex_lock(&agent_mutex);
agent_cnt--;
slurm_cond_signal(&agent_cond);
slurm_mutex_unlock(&agent_mutex);
xfree(x);
return NULL;
}
static void *_agent(void *x)
{
struct agent_arg *args = (struct agent_arg *) x;
kvs_comm_set_t *kvs_set;
struct msg_arg *msg_args;
struct kvs_hosts *kvs_host_list;
int i, j, kvs_set_cnt = 0, host_cnt, pmi_fanout = 32;
int msg_sent = 0, max_forward = 0;
char *tmp, *fanout_off_host;
DEF_TIMERS;
tmp = getenv("PMI_FANOUT");
if (tmp) {
pmi_fanout = atoi(tmp);
if (pmi_fanout < 1)
pmi_fanout = 32;
}
fanout_off_host = getenv("PMI_FANOUT_OFF_HOST");
/* only send one message to each host,
* build table of the ports on each host */
START_TIMER;
kvs_set = xmalloc(sizeof(kvs_comm_set_t) * args->barrier_xmit_cnt);
for (i=0; i<args->barrier_xmit_cnt; i++) {
if (args->barrier_xmit_ptr[i].port == 0)
continue; /* already sent message to host */
kvs_host_list = xcalloc(pmi_fanout, sizeof(struct kvs_hosts));
host_cnt = 0;
/* This code enables key-pair forwarding between
* tasks. First task on the node gets the key-pairs
* with host/port information for all other tasks on
* that node it should forward the information to. */
for (j=(i+1); j<args->barrier_xmit_cnt; j++) {
if (args->barrier_xmit_ptr[j].port == 0)
continue; /* already sent message */
if ((fanout_off_host == NULL) &&
strcmp(args->barrier_xmit_ptr[i].hostname,
args->barrier_xmit_ptr[j].hostname))
continue; /* another host */
kvs_host_list[host_cnt].task_id = 0; /* not avail */
kvs_host_list[host_cnt].port =
args->barrier_xmit_ptr[j].port;
kvs_host_list[host_cnt].hostname =
args->barrier_xmit_ptr[j].hostname;
args->barrier_xmit_ptr[j].port = 0;/* don't reissue */
host_cnt++;
if (host_cnt >= pmi_fanout)
break;
}
msg_sent++;
max_forward = MAX(host_cnt, max_forward);
slurm_mutex_lock(&agent_mutex);
while (agent_cnt >= agent_max_cnt)
slurm_cond_wait(&agent_cond, &agent_mutex);
agent_cnt++;
slurm_mutex_unlock(&agent_mutex);
msg_args = xmalloc(sizeof(struct msg_arg));
msg_args->bar_ptr = &args->barrier_xmit_ptr[i];
msg_args->kvs_ptr = &kvs_set[kvs_set_cnt];
kvs_set[kvs_set_cnt].host_cnt = host_cnt;
kvs_set[kvs_set_cnt].kvs_host_ptr = kvs_host_list;
kvs_set[kvs_set_cnt].kvs_comm_recs = args->kvs_xmit_cnt;
kvs_set[kvs_set_cnt].kvs_comm_ptr = args->kvs_xmit_ptr;
kvs_set_cnt++;
if (agent_max_cnt == 1) {
/* TotalView slows down a great deal for
* pthread_create() calls, so just send the
* messages inline when TotalView is in use
* or for some other reason we only want
* one pthread. */
_msg_thread((void *) msg_args);
} else {
slurm_thread_create_detached(_msg_thread, msg_args);
}
}
verbose("Sent KVS info to %d nodes, up to %d tasks per node",
msg_sent, (max_forward+1));
/* wait for completion of all outgoing message */
slurm_mutex_lock(&agent_mutex);
while (agent_cnt > 0)
slurm_cond_wait(&agent_cond, &agent_mutex);
slurm_mutex_unlock(&agent_mutex);
/* Release allocated memory */
for (i=0; i<kvs_set_cnt; i++)
xfree(kvs_set[i].kvs_host_ptr);
xfree(kvs_set);
for (i=0; i<args->barrier_xmit_cnt; i++)
xfree(args->barrier_xmit_ptr[i].hostname);
xfree(args->barrier_xmit_ptr);
for (i=0; i<args->kvs_xmit_cnt; i++) {
for (j=0; j<args->kvs_xmit_ptr[i]->kvs_cnt; j++) {
xfree(args->kvs_xmit_ptr[i]->kvs_keys[j]);
xfree(args->kvs_xmit_ptr[i]->kvs_values[j]);
}
xfree(args->kvs_xmit_ptr[i]->kvs_keys);
xfree(args->kvs_xmit_ptr[i]->kvs_values);
xfree(args->kvs_xmit_ptr[i]->kvs_name);
xfree(args->kvs_xmit_ptr[i]);
}
xfree(args->kvs_xmit_ptr);
xfree(args);
END_TIMER;
debug("kvs_xmit time %ld usec", DELTA_TIMER);
return NULL;
}
/* duplicate the current KVS comm structure */
struct kvs_comm **_kvs_comm_dup(void)
{
int i, j, cnt;
struct kvs_comm **rc_kvs;
rc_kvs = xmalloc(sizeof(struct kvs_comm *) * kvs_comm_cnt);
for (i=0; i<kvs_comm_cnt; i++) {
rc_kvs[i] = xmalloc(sizeof(struct kvs_comm));
rc_kvs[i]->kvs_name = xstrdup(kvs_comm_ptr[i]->kvs_name);
rc_kvs[i]->kvs_cnt = kvs_comm_ptr[i]->kvs_cnt;
rc_kvs[i]->kvs_keys =
xmalloc(sizeof(char *) * rc_kvs[i]->kvs_cnt);
rc_kvs[i]->kvs_values =
xmalloc(sizeof(char *) * rc_kvs[i]->kvs_cnt);
if (kvs_comm_ptr[i]->kvs_key_sent == NULL) {
kvs_comm_ptr[i]->kvs_key_sent =
xmalloc(sizeof(uint16_t) *
kvs_comm_ptr[i]->kvs_cnt);
}
cnt = 0;
for (j=0; j<rc_kvs[i]->kvs_cnt; j++) {
if (kvs_comm_ptr[i]->kvs_key_sent[j])
continue;
rc_kvs[i]->kvs_keys[cnt] =
xstrdup(kvs_comm_ptr[i]->kvs_keys[j]);
rc_kvs[i]->kvs_values[cnt] =
xstrdup(kvs_comm_ptr[i]->kvs_values[j]);
cnt++;
kvs_comm_ptr[i]->kvs_key_sent[j] = 1;
}
rc_kvs[i]->kvs_cnt = cnt;
}
return rc_kvs;
}
/* return pointer to named kvs element or NULL if not found */
static struct kvs_comm *_find_kvs_by_name(char *name)
{
int i;
for (i=0; i<kvs_comm_cnt; i++) {
if (strcmp(kvs_comm_ptr[i]->kvs_name, name))
continue;
return kvs_comm_ptr[i];
}
return NULL;
}
static void _merge_named_kvs(struct kvs_comm *kvs_orig,
struct kvs_comm *kvs_new)
{
int i, j;
for (i=0; i<kvs_new->kvs_cnt; i++) {
if (pmi_kvs_no_dup_keys)
goto no_dup;
for (j=0; j<kvs_orig->kvs_cnt; j++) {
if (strcmp(kvs_new->kvs_keys[i], kvs_orig->kvs_keys[j]))
continue;
xfree(kvs_orig->kvs_values[j]);
if (kvs_orig->kvs_key_sent)
kvs_orig->kvs_key_sent[j] = 0;
kvs_orig->kvs_values[j] = kvs_new->kvs_values[i];
kvs_new->kvs_values[i] = NULL;
break;
}
if (j < kvs_orig->kvs_cnt)
continue; /* already recorded, update */
no_dup:
/* append it */
kvs_orig->kvs_cnt++;
xrealloc(kvs_orig->kvs_keys,
(sizeof(char *) * kvs_orig->kvs_cnt));
xrealloc(kvs_orig->kvs_values,
(sizeof(char *) * kvs_orig->kvs_cnt));
kvs_orig->kvs_keys[kvs_orig->kvs_cnt-1] = kvs_new->kvs_keys[i];
kvs_orig->kvs_values[kvs_orig->kvs_cnt-1] =
kvs_new->kvs_values[i];
kvs_new->kvs_keys[i] = NULL;
kvs_new->kvs_values[i] = NULL;
}
if (kvs_orig->kvs_key_sent) {
xrealloc(kvs_orig->kvs_key_sent,
(sizeof(uint16_t) * kvs_orig->kvs_cnt));
}
}
static void _move_kvs(struct kvs_comm *kvs_new)
{
kvs_comm_ptr = xrealloc(kvs_comm_ptr, (sizeof(struct kvs_comm *) *
(kvs_comm_cnt + 1)));
kvs_comm_ptr[kvs_comm_cnt] = kvs_new;
kvs_comm_cnt++;
}
static void _print_kvs(void)
{
#if _DEBUG
int i, j;
info("KVS dump start");
for (i=0; i<kvs_comm_cnt; i++) {
for (j=0; j<kvs_comm_ptr[i]->kvs_cnt; j++) {
info("KVS: %s:%s:%s", kvs_comm_ptr[i]->kvs_name,
kvs_comm_ptr[i]->kvs_keys[j],
kvs_comm_ptr[i]->kvs_values[j]);
}
}
#endif
}
extern int pmi_kvs_put(kvs_comm_set_t *kvs_set_ptr)
{
int i, usec_timer;
struct kvs_comm *kvs_ptr;
static int pmi_kvs_no_dup_keys_set = 0;
DEF_TIMERS;
if (pmi_kvs_no_dup_keys_set == 0) {
/* In MPI implementations, there will be no duplicate
* keys put into the KVS usually. Hence the checking
* for duplicate keys can be skipped. However if the
* user wants to have duplicate keys he must set
* this env variable. If a duplicate key is found
* the previous value will be discarded.
*/
char *env = getenv("SLURM_PMI_KVS_DUP_KEYS");
if (env)
pmi_kvs_no_dup_keys = 0;
pmi_kvs_no_dup_keys_set = 1;
}
/* Merge new data with old.
* NOTE: We just move pointers rather than copy data where
* possible for improved performance */
START_TIMER;
slurm_mutex_lock(&kvs_mutex);
for (i=0; i<kvs_set_ptr->kvs_comm_recs; i++) {
kvs_ptr = _find_kvs_by_name(kvs_set_ptr->
kvs_comm_ptr[i]->kvs_name);
if (kvs_ptr) {
_merge_named_kvs(kvs_ptr,
kvs_set_ptr->kvs_comm_ptr[i]);
} else {
_move_kvs(kvs_set_ptr->kvs_comm_ptr[i]);
kvs_set_ptr-> kvs_comm_ptr[i] = NULL;
}
}
_print_kvs();
kvs_updated = 1;
slurm_mutex_unlock(&kvs_mutex);
END_TIMER;
usec_timer = DELTA_TIMER;
min_time_kvs_put = MIN(min_time_kvs_put, usec_timer);
max_time_kvs_put = MAX(max_time_kvs_put, usec_timer);
tot_time_kvs_put += usec_timer;
return SLURM_SUCCESS;
}
extern int pmi_kvs_get(kvs_get_msg_t *kvs_get_ptr)
{
int rc = SLURM_SUCCESS;
#if _DEBUG_TIMING
static uint32_t tm[10000];
int cur_time, i;
struct timeval tv;
#endif
#if _DEBUG
info("pmi_kvs_get: rank:%u size:%u port:%hu, host:%s",
kvs_get_ptr->task_id, kvs_get_ptr->size,
kvs_get_ptr->port, kvs_get_ptr->hostname);
#endif
if (kvs_get_ptr->size == 0) {
error("PMK_KVS_Barrier reached with size == 0");
return SLURM_ERROR;
}
#if _DEBUG_TIMING
gettimeofday(&tv, NULL);
cur_time = (tv.tv_sec % 1000) + tv.tv_usec;
if (kvs_get_ptr->task_id < 10000)
tm[kvs_get_ptr->task_id] = cur_time;
#endif
slurm_mutex_lock(&kvs_mutex);
if (barrier_cnt == 0) {
barrier_cnt = kvs_get_ptr->size;
barrier_ptr = xmalloc(sizeof(struct barrier_resp)*barrier_cnt);
} else if (barrier_cnt != kvs_get_ptr->size) {
error("PMK_KVS_Barrier task count inconsistent (%u != %u)",
barrier_cnt, kvs_get_ptr->size);
rc = SLURM_ERROR;
goto fini;
}
if (kvs_get_ptr->task_id >= barrier_cnt) {
error("PMK_KVS_Barrier task count(%u) >= size(%u)",
kvs_get_ptr->task_id, barrier_cnt);
rc = SLURM_ERROR;
goto fini;
}
if (barrier_ptr[kvs_get_ptr->task_id].port == 0)
barrier_resp_cnt++;
else
error("PMK_KVS_Barrier duplicate request from task %u",
kvs_get_ptr->task_id);
barrier_ptr[kvs_get_ptr->task_id].port = kvs_get_ptr->port;
barrier_ptr[kvs_get_ptr->task_id].hostname = kvs_get_ptr->hostname;
kvs_get_ptr->hostname = NULL; /* just moved the pointer */
if (barrier_resp_cnt == barrier_cnt) {
#if _DEBUG_TIMING
info("task[%d] at %u", 0, tm[0]);
for (i=1; ((i<barrier_cnt) && (i<10000)); i++) {
cur_time = (int) tm[i] - (int) tm[i-1];
info("task[%d] at %u diff %d", i, tm[i], cur_time);
}
#endif
_kvs_xmit_tasks();
}
fini: slurm_mutex_unlock(&kvs_mutex);
return rc;
}
/*
* Set the maximum number of threads to be used by the PMI server code.
* The PMI server code is used internally by the slurm_step_launch() function
* to support MPI libraries that bootstrap themselves using PMI.
*/
extern void pmi_server_max_threads(int max_threads)
{
if (max_threads <= 0)
error("pmi server max threads must be greater than zero");
else
agent_max_cnt = max_threads;
}
/* copied from slurm_pmi.c */
static void _free_kvs_comm(struct kvs_comm *kvs_comm_ptr)
{
int i;
if (kvs_comm_ptr == NULL)
return;
for (i=0; i<kvs_comm_ptr->kvs_cnt; i++) {
xfree(kvs_comm_ptr->kvs_keys[i]);
xfree(kvs_comm_ptr->kvs_values[i]);
}
xfree(kvs_comm_ptr->kvs_key_sent);
xfree(kvs_comm_ptr->kvs_name);
xfree(kvs_comm_ptr->kvs_keys);
xfree(kvs_comm_ptr->kvs_values);
xfree(kvs_comm_ptr);
}
/* free local kvs set*/
extern void pmi_kvs_free(void)
{
int i;
slurm_mutex_lock(&kvs_mutex);
for (i = 0; i < kvs_comm_cnt; i ++) {
_free_kvs_comm(kvs_comm_ptr[i]);
}
xfree(kvs_comm_ptr);
kvs_comm_cnt = 0;
slurm_mutex_unlock(&kvs_mutex);
}