blob: a3cdefef210f999df9d7b74d59d61e6f81beb567 [file] [log] [blame] [edit]
/*****************************************************************************\
** tree.c - PMI tree communication handling code
*****************************************************************************
* Copyright (C) 2011-2012 National University of Defense Technology.
* Written by Hongjia Cao <hjcao@nudt.edu.cn>.
* All rights reserved.
*
* This file is part of SLURM, a resource management program.
* For details, see <https://computing.llnl.gov/linux/slurm/>.
* Please also read the included file: DISCLAIMER.
*
* SLURM is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with SLURM; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#if HAVE_CONFIG_H
# include "config.h"
#endif
#include <unistd.h>
#include <errno.h>
#include <stdlib.h>
#include "src/common/slurm_xlator.h"
#include "src/common/slurm_protocol_interface.h"
#include "src/common/slurm_protocol_api.h"
#include "kvs.h"
#include "spawn.h"
#include "client.h"
#include "setup.h"
#include "pmi.h"
static int _handle_kvs_fence(int fd, Buf buf);
static int _handle_kvs_fence_resp(int fd, Buf buf);
static int _handle_spawn(int fd, Buf buf);
static int _handle_spawn_resp(int fd, Buf buf);
static int (*tree_cmd_handlers[]) (int fd, Buf buf) = {
_handle_kvs_fence,
_handle_kvs_fence_resp,
_handle_spawn,
_handle_spawn_resp,
NULL
};
static char *tree_cmd_names[] = {
"TREE_CMD_KVS_FENCE",
"TREE_CMD_KVS_FENCE_RESP",
"TREE_CMD_SPAWN",
"TREE_CMD_SPAWN_RESP",
NULL,
};
static int
_handle_kvs_fence(int fd, Buf buf)
{
uint32_t from_nodeid, num_children, temp32;
char *from_node = NULL;
safe_unpack32(&from_nodeid, buf);
safe_unpackstr_xmalloc(&from_node, &temp32, buf);
safe_unpack32(&num_children, buf);
debug3("mpi/pmi2: in _handle_kvs_fence, from node %u(%s) representing"
" %u offspring", from_nodeid, from_node, num_children);
if (tasks_to_wait == 0 && children_to_wait == 0) {
tasks_to_wait = job_info.ltasks;
children_to_wait = tree_info.num_children;
}
children_to_wait -= num_children;
temp_kvs_merge(buf);
if (children_to_wait == 0 && tasks_to_wait == 0) {
temp_kvs_send();
}
debug3("mpi/pmi2: out _handle_kvs_fence, tasks_to_wait=%d, "
"children_to_wait=%d", tasks_to_wait, children_to_wait);
return SLURM_SUCCESS;
unpack_error:
error("mpi/pmi2: failed to unpack kvs fence message");
return SLURM_ERROR;
}
static int
_handle_kvs_fence_resp(int fd, Buf buf)
{
char *key, *val;
int rc = 0, i = 0;
client_resp_t *resp;
uint32_t temp32;
debug3("mpi/pmi2: in _handle_kvs_fence_resp");
temp32 = remaining_buf(buf);
debug3("mpi/pmi2: buf length: %u", temp32);
/* put kvs into local hash */
while (remaining_buf(buf) > 0) {
safe_unpackstr_xmalloc(&key, &temp32, buf);
safe_unpackstr_xmalloc(&val, &temp32, buf);
kvs_put(key, val);
//temp32 = remaining_buf(buf);
xfree(key);
xfree(val);
}
resp:
/* send fence_resp/barrier_out to tasks */
resp = client_resp_new();
if ( is_pmi11() ) {
client_resp_append(resp, CMD_KEY"="BARRIEROUT_CMD" "
RC_KEY"=%d\n", rc);
} else if (is_pmi20()) {
client_resp_append(resp, CMD_KEY"="KVSFENCERESP_CMD";"
RC_KEY"=%d;", rc);
}
for (i = 0; i < job_info.ltasks; i ++) {
client_resp_send(resp, STEPD_PMI_SOCK(i));
}
client_resp_free(resp);
return rc;
unpack_error:
error("mpi/pmi2: unpack kvs error in fence resp");
rc = SLURM_ERROR;
goto resp;
}
/* only called in srun */
static int
_handle_spawn(int fd, Buf buf)
{
int rc;
spawn_req_t *req = NULL;
spawn_resp_t *resp = NULL;
debug3("mpi/pmi2: in _handle_spawn");
rc = spawn_req_unpack(&req, buf);
if (rc != SLURM_SUCCESS) {
error("mpi/pmi2: failed to unpack spawn request spawn cmd");
/* We lack a hostname to send response below.
resp = spawn_resp_new();
resp->rc = rc;
rc = spawn_resp_send_to_stepd(resp, req->from_node);
spawn_resp_free(resp); */
return rc;
}
/* assign a sequence number */
req->seq = spawn_seq_next();
resp = spawn_resp_new();
resp->seq = req->seq;
resp->jobid = NULL;
resp->error_cnt = 0;
/* fork srun */
rc = spawn_job_do_spawn(req);
if (rc != SLURM_SUCCESS) {
error("mpi/pmi2: failed to spawn job");
resp->rc = rc;
} else {
spawn_psr_enqueue(resp->seq, -1, -1, req->from_node);
resp->rc = SLURM_SUCCESS; /* temp resp */
}
spawn_resp_send_to_fd(resp, fd);
spawn_req_free(req);
spawn_resp_free(resp);
debug3("mpi/pmi2: out _handle_spawn");
return rc;
}
static int
_send_task_spawn_resp_pmi20(spawn_resp_t *spawn_resp, int task_fd,
int task_lrank)
{
int i, rc;
client_resp_t *task_resp;
char *error_codes = NULL;
task_resp = client_resp_new();
client_resp_append(task_resp,
CMD_KEY"="SPAWNRESP_CMD";"
RC_KEY"=%d;"
JOBID_KEY"=%s;",
spawn_resp->rc,
spawn_resp->jobid);
/* seems that simple2pmi does not consider rc */
if (spawn_resp->rc != SLURM_SUCCESS) {
xstrfmtcat(error_codes, "%d", spawn_resp->rc);
}
if (spawn_resp->error_cnt > 0) {
if (error_codes) {
xstrfmtcat(error_codes, ",%d", spawn_resp->error_codes[0]);
} else {
xstrfmtcat(error_codes, "%d", spawn_resp->error_codes[0]);
}
for (i = 1; i < spawn_resp->error_cnt; i ++) {
xstrfmtcat(error_codes, ",%d",
spawn_resp->error_codes[i]);
}
}
if (error_codes) {
client_resp_append(task_resp, ERRCODES_KEY"=%s;",
error_codes);
xfree(error_codes);
}
rc = client_resp_send(task_resp, task_fd);
client_resp_free(task_resp);
return rc;
}
static int
_send_task_spawn_resp_pmi11(spawn_resp_t *spawn_resp, int task_fd,
int task_lrank)
{
int i, rc;
client_resp_t *task_resp;
char *error_codes = NULL;
task_resp = client_resp_new();
client_resp_append(task_resp,
CMD_KEY"="SPAWNRESULT_CMD" "
RC_KEY"=%d "
JOBID_KEY"=%s", /* JOBID_KEY is not required */
spawn_resp->rc,
spawn_resp->jobid);
if (spawn_resp->rc != SLURM_SUCCESS) {
xstrfmtcat(error_codes, "%d", spawn_resp->rc);
}
if (spawn_resp->error_cnt > 0) {
if (error_codes) {
xstrfmtcat(error_codes, ",%d", spawn_resp->error_codes[0]);
} else {
xstrfmtcat(error_codes, "%d", spawn_resp->error_codes[0]);
}
for (i = 1; i < spawn_resp->error_cnt; i ++) {
xstrfmtcat(error_codes, ",%d",
spawn_resp->error_codes[i]);
}
}
if (error_codes) {
client_resp_append(task_resp, " "ERRCODES_KEY"=%s\n",
error_codes);
xfree(error_codes);
} else {
client_resp_append(task_resp, "\n");
}
rc = client_resp_send(task_resp, task_fd);
client_resp_free(task_resp);
return rc;
}
/* called in stepd and srun */
static int
_handle_spawn_resp(int fd, Buf buf)
{
int rc, task_fd, task_lrank;
spawn_resp_t *spawn_resp;
char *from_node = NULL;
debug3("mpi/pmi2: in _handle_spawn_resp");
rc = spawn_resp_unpack(&spawn_resp, buf);
if (rc != SLURM_SUCCESS) {
error("mpi/pmi2: failed to unpack spawn response tree cmd");
return SLURM_ERROR;
}
rc = spawn_psr_dequeue(spawn_resp->seq, &task_fd, &task_lrank, &from_node);
if (rc != SLURM_SUCCESS) {
error("mpi/pmi2: spawn response not matched in psr list");
return SLURM_ERROR;
}
if (from_node == NULL) { /* stepd */
debug3("mpi/pmi2: spawned tasks of %s launched",
spawn_resp->jobid);
if (is_pmi20()) {
_send_task_spawn_resp_pmi20(spawn_resp, task_fd, task_lrank);
} else if (is_pmi11()) {
_send_task_spawn_resp_pmi11(spawn_resp, task_fd, task_lrank);
}
} else { /* srun */
debug3("mpi/pmi2: spawned tasks of %s launched",
spawn_resp->jobid);
/* forward resp to stepd */
spawn_resp_send_to_stepd(spawn_resp, from_node);
xfree(from_node);
}
spawn_resp_free(spawn_resp);
return rc;
}
/**************************************************************/
extern int
handle_tree_cmd(int fd)
{
char *req_buf = NULL;
uint32_t len;
Buf buf = NULL;
uint16_t cmd;
int rc;
debug3("mpi/pmi2: in handle_tree_cmd");
safe_read(fd, &len, sizeof(uint32_t));
len = ntohl(len);
safe_read(fd, &cmd, sizeof(uint16_t));
cmd = ntohs(cmd);
if (cmd >= TREE_CMD_COUNT) {
error("mpi/pmi2: invalid tree req command");
return SLURM_ERROR;
}
len -= sizeof(cmd);
req_buf = xmalloc(len + 1);
safe_read(fd, req_buf, len);
buf = create_buf(req_buf, len); /* req_buf taken by buf */
debug3("mpi/pmi2: got tree cmd: %hu(%s)", cmd, tree_cmd_names[cmd]);
rc = tree_cmd_handlers[cmd](fd, buf);
free_buf (buf);
debug3("mpi/pmi2: out handle_tree_cmd");
return rc;
rwfail:
xfree(req_buf);
return SLURM_ERROR;
}
extern int
tree_msg_to_srun(uint32_t len, char *msg)
{
int fd, rc;
fd = _slurm_open_stream(tree_info.srun_addr, true);
rc = _slurm_msg_sendto(fd, msg, len, SLURM_PROTOCOL_NO_SEND_RECV_FLAGS);
close(fd);
return rc;
}
extern int
tree_msg_to_srun_with_resp(uint32_t len, char *msg, Buf *resp_ptr)
{
int fd, rc;
Buf buf = NULL;
char *data = NULL;
xassert(resp_ptr != NULL);
fd = _slurm_open_stream(tree_info.srun_addr, true);
rc = _slurm_msg_sendto(fd, msg, len, SLURM_PROTOCOL_NO_SEND_RECV_FLAGS);
if (rc == len) { /* all data sent */
safe_read(fd, &len, sizeof(len));
len = ntohl(len);
data = xmalloc(len);
safe_read(fd, data, len);
buf = create_buf(data, len);
*resp_ptr = buf;
rc = SLURM_SUCCESS;
} else {
rc = SLURM_ERROR;
}
close(fd);
return rc;
rwfail:
close (fd);
xfree(data);
return SLURM_ERROR;
}
extern int
tree_msg_to_stepds(char *nodelist, uint32_t len, char *msg)
{
int rc;
rc = slurm_forward_data(nodelist,
tree_sock_addr,
len,
msg);
return rc;
}