blob: dbbbc3457acec9a4d5a5991260db2a0903fb699f [file] [log] [blame]
/*****************************************************************************\
** pmi2.c - PMI2 client(task) command handling
*****************************************************************************
* Copyright (C) 2011-2012 National University of Defense Technology.
* Written by Hongjia Cao <hjcao@nudt.edu.cn>.
* All rights reserved.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#if defined(__FreeBSD__)
#include <sys/socket.h> /* AF_INET */
#endif
#include <fcntl.h>
#include <signal.h>
#include <stdlib.h>
#include <sys/types.h>
#include "src/common/slurm_xlator.h"
#include "src/common/log.h"
#include "src/common/xmalloc.h"
#include "pmi.h"
#include "client.h"
#include "spawn.h"
#include "kvs.h"
#include "info.h"
#include "setup.h"
#include "agent.h"
#include "nameserv.h"
#include "ring.h"
/* PMI2 command handlers */
static int _handle_fullinit(int fd, int lrank, client_req_t *req);
static int _handle_finalize(int fd, int lrank, client_req_t *req);
static int _handle_abort(int fd, int lrank, client_req_t *req);
static int _handle_job_getid(int fd, int lrank, client_req_t *req);
static int _handle_job_connect(int fd, int lrank, client_req_t *req);
static int _handle_job_disconnect(int fd, int lrank, client_req_t *req);
static int _handle_ring(int fd, int lrank, client_req_t *req);
static int _handle_kvs_put(int fd, int lrank, client_req_t *req);
static int _handle_kvs_fence(int fd, int lrank, client_req_t *req);
static int _handle_kvs_get(int fd, int lrank, client_req_t *req);
static int _handle_info_getnodeattr(int fd, int lrank, client_req_t *req);
static int _handle_info_putnodeattr(int fd, int lrank, client_req_t *req);
static int _handle_info_getjobattr(int fd, int lrank, client_req_t *req);
static int _handle_name_publish(int fd, int lrank, client_req_t *req);
static int _handle_name_unpublish(int fd, int lrank, client_req_t *req);
static int _handle_name_lookup(int fd, int lrank, client_req_t *req);
static int _handle_spawn(int fd, int lrank, client_req_t *req);
static struct {
char *cmd;
int (*handler)(int fd, int lrank, client_req_t *req);
} pmi2_cmd_handlers[] = {
{ FULLINIT_CMD, _handle_fullinit },
{ FINALIZE_CMD, _handle_finalize },
{ ABORT_CMD, _handle_abort },
{ JOBGETID_CMD, _handle_job_getid },
{ JOBCONNECT_CMD, _handle_job_connect },
{ JOBDISCONNECT_CMD, _handle_job_disconnect },
{ RING_CMD, _handle_ring },
{ KVSPUT_CMD, _handle_kvs_put },
{ KVSFENCE_CMD, _handle_kvs_fence },
{ KVSGET_CMD, _handle_kvs_get },
{ GETNODEATTR_CMD, _handle_info_getnodeattr },
{ PUTNODEATTR_CMD, _handle_info_putnodeattr },
{ GETJOBATTR_CMD, _handle_info_getjobattr },
{ NAMEPUBLISH_CMD, _handle_name_publish },
{ NAMEUNPUBLISH_CMD, _handle_name_unpublish },
{ NAMELOOKUP_CMD, _handle_name_lookup },
{ SPAWN_CMD, _handle_spawn },
{ NULL, NULL},
};
static int
_handle_fullinit(int fd, int lrank, client_req_t *req)
{
int pmi_jobid, pmi_rank;
bool threaded;
int found, rc = PMI2_SUCCESS;
client_resp_t *resp;
debug3("mpi/pmi2: _handle_fullinit");
client_req_parse_body(req);
found = client_req_get_int(req, PMIJOBID_KEY, &pmi_jobid);
if (! found) {
error(PMIJOBID_KEY" missing in fullinit command");
rc = PMI2_ERR_INVALID_ARG;
goto response;
}
found = client_req_get_int(req, PMIRANK_KEY, &pmi_rank);
if (! found) {
error(PMIRANK_KEY" missing in fullinit command");
rc = PMI2_ERR_INVALID_ARG;
goto response;
}
found = client_req_get_bool(req, THREADED_KEY, &threaded);
if (! found) {
error(THREADED_KEY" missing in fullinit command");
rc = PMI2_ERR_INVALID_ARG;
goto response;
}
/* TODO: use threaded */
response:
resp = client_resp_new();
/* what's the difference between DEBUGGED and VERBOSE? */
/* TODO: APPNUM */
client_resp_append(resp, CMD_KEY"="FULLINITRESP_CMD";" RC_KEY"=%d;"
PMIVERSION_KEY"=%d;" PMISUBVER_KEY"=%d;"
RANK_KEY"=%d;" SIZE_KEY"=%d;"
APPNUM_KEY"=-1;" DEBUGGED_KEY"="FALSE_VAL";"
PMIVERBOSE_KEY"=%s;",
rc,
PMI20_VERSION, PMI20_SUBVERSION,
job_info.gtids[lrank], job_info.ntasks,
(job_info.pmi_debugged ? TRUE_VAL : FALSE_VAL));
if (job_info.spawner_jobid) {
client_resp_append(resp, SPAWNERJOBID_KEY"=%s;",
job_info.spawner_jobid);
}
rc = client_resp_send(resp, fd);
client_resp_free(resp);
debug3("mpi/pmi2: fullinit done");
return rc;
}
static int
_handle_finalize(int fd, int lrank, client_req_t *req)
{
client_resp_t *resp;
int rc = 0;
resp = client_resp_new();
client_resp_append(resp, CMD_KEY"="FINALIZERESP_CMD";"
RC_KEY"=%d;", rc);
rc = client_resp_send(resp, fd);
client_resp_free(resp);
/* shutdown the PMI fd */
shutdown(fd, SHUT_RDWR);
close(fd);
task_finalize(lrank);
return rc;
}
static int
_handle_abort(int fd, int lrank, client_req_t *req)
{
int rc = SLURM_SUCCESS;
bool is_world = false;
debug3("mpi/pmi2: in _handle_abort");
client_req_parse_body(req);
client_req_get_bool(req, ISWORLD_KEY, &is_world);
/* no response needed. just cancel the job step if required */
if (is_world) {
slurm_kill_job_step(job_info.step_id.job_id,
job_info.step_id.step_id, SIGKILL, 0);
}
return rc;
}
static int
_handle_job_getid(int fd, int lrank, client_req_t *req)
{
int rc = SLURM_SUCCESS;
client_resp_t *resp;
debug3("mpi/pmi2: in _handle_job_getid");
resp = client_resp_new();
client_resp_append(resp, CMD_KEY"="JOBGETIDRESP_CMD";" RC_KEY"=0;"
JOBID_KEY"=%s;", job_info.pmi_jobid);
rc = client_resp_send(resp, fd);
client_resp_free(resp);
debug3("mpi/pmi2: out _handle_job_getid");
return rc;
}
static int
_handle_job_connect(int fd, int lrank, client_req_t *req)
{
int rc = SLURM_SUCCESS;
error("mpi/pmi2: job connect not implemented for now");
return rc;
}
static int
_handle_job_disconnect(int fd, int lrank, client_req_t *req)
{
int rc = SLURM_SUCCESS;
error("mpi/pmi2: job disconnect not implemented for now");
return rc;
}
static int
_handle_ring(int fd, int lrank, client_req_t *req)
{
int rc = SLURM_SUCCESS;
int count = 0;
char *left = NULL;
char *right = NULL;
debug3("mpi/pmi2: in _handle_ring");
/* extract left, right, and count values from ring payload */
client_req_parse_body(req);
client_req_get_int(req, RING_COUNT_KEY, &count);
client_req_get_str(req, RING_LEFT_KEY, &left);
client_req_get_str(req, RING_RIGHT_KEY, &right);
/* compute ring_id, we list all application tasks first,
* followed by stepds, so here we just use the application
* process rank */
int ring_id = lrank;
rc = pmix_ring_in(ring_id, count, left, right);
xfree(left);
xfree(right);
/* the response is sent back to client from the pmix_ring_out call */
debug3("mpi/pmi2: out _handle_ring");
return rc;
}
static int
_handle_kvs_put(int fd, int lrank, client_req_t *req)
{
int rc = SLURM_SUCCESS;
client_resp_t *resp;
char *key = NULL, *val = NULL;
debug3("mpi/pmi2: in _handle_kvs_put");
client_req_parse_body(req);
client_req_get_str(req, KEY_KEY, &key);
client_req_get_str(req, VALUE_KEY, &val);
/* no need to add k-v to hash. just get it ready to be up-forward */
rc = temp_kvs_add(key, val);
xfree(key);
xfree(val);
resp = client_resp_new();
client_resp_append(resp, CMD_KEY"="KVSPUTRESP_CMD";" RC_KEY"=%d;", rc);
rc = client_resp_send(resp, fd);
client_resp_free(resp);
debug3("mpi/pmi2: out _handle_kvs_put");
return rc;
}
static int
_handle_kvs_fence(int fd, int lrank, client_req_t *req)
{
int rc = 0;
debug3("mpi/pmi2: in _handle_kvs_fence, from task %d",
job_info.gtids[lrank]);
if (tasks_to_wait == 0 && children_to_wait == 0) {
tasks_to_wait = job_info.ltasks;
children_to_wait = tree_info.num_children;
}
tasks_to_wait --;
/* mutex protection is not required */
if (tasks_to_wait == 0 && children_to_wait == 0) {
rc = temp_kvs_send();
if (rc != SLURM_SUCCESS) {
error("mpi/pmi2: %d failed to send temp kvs to %s",
__LINE__, tree_info.parent_node ?: "srun");
send_kvs_fence_resp_to_clients(
rc,
"mpi/pmi2: failed to send temp kvs");
/* cancel the step to avoid tasks hang */
slurm_kill_job_step(job_info.step_id.job_id,
job_info.step_id.step_id,
SIGKILL, 0);
} else {
waiting_kvs_resp = 1;
}
}
debug3("mpi/pmi2: out _handle_kvs_fence, tasks_to_wait=%d, "
"children_to_wait=%d", tasks_to_wait, children_to_wait);
return rc;
}
static int
_handle_kvs_get(int fd, int lrank, client_req_t *req)
{
int rc;
client_resp_t *resp;
char *key = NULL, *val;
debug3("mpi/pmi2: in _handle_kvs_get");
client_req_parse_body(req);
client_req_get_str(req, KEY_KEY, &key);
val = kvs_get(key);
xfree(key);
resp = client_resp_new();
if (val != NULL) {
client_resp_append(resp, CMD_KEY"="KVSGETRESP_CMD";"
RC_KEY"=0;" FOUND_KEY"="TRUE_VAL";"
VALUE_KEY"=%s;", val);
} else {
client_resp_append(resp, CMD_KEY"="KVSGETRESP_CMD";"
RC_KEY"=0;" FOUND_KEY"="FALSE_VAL";");
}
rc = client_resp_send(resp, fd);
client_resp_free(resp);
debug3("mpi/pmi2: out _handle_kvs_get");
return rc;
}
static int
_handle_info_getnodeattr(int fd, int lrank, client_req_t *req)
{
int rc = 0;
client_resp_t *resp;
char *key = NULL, *val;
bool wait = false;
debug3("mpi/pmi2: in _handle_info_getnodeattr from lrank %d", lrank);
client_req_parse_body(req);
client_req_get_str(req, KEY_KEY, &key);
client_req_get_bool(req, WAIT_KEY, &wait);
val = node_attr_get(key);
if (val != NULL || (! wait)) {
resp = client_resp_new();
client_resp_append(resp, CMD_KEY"="GETNODEATTRRESP_CMD";"
RC_KEY"=0;" );
if (val == NULL) {
client_resp_append(resp, FOUND_KEY"="FALSE_VAL";" );
} else {
client_resp_append(resp, FOUND_KEY"="TRUE_VAL";"
VALUE_KEY"=%s;", val);
}
rc = client_resp_send(resp, fd);
client_resp_free(resp);
} else {
rc = enqueue_nag_req(fd, lrank, key);
}
xfree(key);
debug3("mpi/pmi2: out _handle_info_getnodeattr");
return rc;
}
static int
_handle_info_putnodeattr(int fd, int lrank, client_req_t *req)
{
char *key, *val;
client_resp_t *resp;
int rc = 0;
debug3("mpi/pmi2: in _handle_info_putnodeattr");
client_req_parse_body(req);
client_req_get_str(req, KEY_KEY, &key);
client_req_get_str(req, VALUE_KEY, &val);
rc = node_attr_put(key, val);
xfree(key);
xfree(val);
resp = client_resp_new();
client_resp_append(resp,
CMD_KEY"="PUTNODEATTRRESP_CMD";" RC_KEY"=%d;", rc);
rc = client_resp_send(resp, fd);
client_resp_free(resp);
debug3("mpi/pmi2: out _handle_info_putnodeattr");
return rc;
}
static int
_handle_info_getjobattr(int fd, int lrank, client_req_t *req)
{
char *key = NULL, *val;
client_resp_t *resp;
int rc;
debug3("mpi/pmi2: in _handle_info_getjobattr");
client_req_parse_body(req);
client_req_get_str(req, KEY_KEY, &key);
val = job_attr_get(key);
xfree(key);
resp = client_resp_new();
client_resp_append(resp, CMD_KEY"="GETJOBATTRRESP_CMD";" RC_KEY"=0;");
if (val != NULL) {
client_resp_append(resp,
FOUND_KEY"="TRUE_VAL";" VALUE_KEY"=%s;",
val);
} else {
client_resp_append(resp, FOUND_KEY"="FALSE_VAL";");
}
rc = client_resp_send(resp, fd);
client_resp_free(resp);
debug3("mpi/pmi2: out _handle_info_getjobattr");
return rc;
}
static int
_handle_name_publish(int fd, int lrank, client_req_t *req)
{
int rc;
client_resp_t *resp;
char *name = NULL, *port = NULL;
debug3("mpi/pmi2: in _handle_publish_name");
client_req_parse_body(req);
client_req_get_str(req, NAME_KEY, &name);
client_req_get_str(req, PORT_KEY, &port);
rc = name_publish_up(name, port);
xfree(name);
xfree(port);
resp = client_resp_new();
client_resp_append(resp, CMD_KEY"="NAMEPUBLISHRESP_CMD";"
RC_KEY"=%d;", rc);
rc = client_resp_send(resp, fd);
client_resp_free(resp);
debug3("mpi/pmi2: out _handle_publish_name");
return rc;
}
static int
_handle_name_unpublish(int fd, int lrank, client_req_t *req)
{
int rc;
client_resp_t *resp;
char *name = NULL;
debug3("mpi/pmi2: in _handle_unpublish_name");
client_req_parse_body(req);
client_req_get_str(req, NAME_KEY, &name);
rc = name_unpublish_up(name);
xfree(name);
resp = client_resp_new();
client_resp_append(resp, CMD_KEY"="NAMEUNPUBLISHRESP_CMD";"
RC_KEY"=%d;", rc);
rc = client_resp_send(resp, fd);
client_resp_free(resp);
debug3("mpi/pmi2: out _handle_unpublish_name");
return rc;
}
static int
_handle_name_lookup(int fd, int lrank, client_req_t *req)
{
int rc;
client_resp_t *resp;
char *name = NULL, *port = NULL;
debug3("mpi/pmi2: in _handle_lookup_name");
client_req_parse_body(req);
client_req_get_str(req, NAME_KEY, &name);
port = name_lookup_up(name);
resp = client_resp_new();
client_resp_append(resp, CMD_KEY"="NAMELOOKUPRESP_CMD";");
if (port == NULL) {
client_resp_append(resp, RC_KEY"=1;");
} else {
client_resp_append(resp, RC_KEY"=0;"VALUE_KEY"=%s;",
port);
}
rc = client_resp_send(resp, fd);
client_resp_free(resp);
xfree(name);
xfree(port);
debug3("mpi/pmi2: out _handle_lookup_name");
return rc;
}
static int
_handle_spawn(int fd, int lrank, client_req_t *req)
{
int rc;
spawn_req_t *spawn_req = NULL;
spawn_resp_t *spawn_resp = NULL;
client_resp_t *task_resp;
debug3("mpi/pmi2: in _handle_spawn");
client_req_parse_body(req);
spawn_req = client_req_parse_spawn_req(req);
if (spawn_req == NULL) {
task_resp = client_resp_new();
client_resp_append(task_resp, CMD_KEY"="SPAWNRESP_CMD";"
RC_KEY"=%d;"
ERRMSG_KEY"=invalid command;",
PMI2_ERR_INVALID_ARGS);
client_resp_send(task_resp, fd);
client_resp_free(task_resp);
return SLURM_ERROR;
}
/* a resp will be send back from srun.
* this will not be forwarded to the tasks */
rc = spawn_req_send_to_srun(spawn_req, &spawn_resp);
if (spawn_resp->rc != SLURM_SUCCESS) {
task_resp = client_resp_new();
client_resp_append(task_resp, CMD_KEY"="SPAWNRESP_CMD";"
RC_KEY"=%d;"
ERRMSG_KEY"=spawn failed;",
spawn_resp->rc);
client_resp_send(task_resp, fd);
client_resp_free(task_resp);
spawn_req_free(spawn_req);
spawn_resp_free(spawn_resp);
debug("mpi/pmi2: spawn failed");
return SLURM_ERROR;
}
debug3("mpi/pmi2: spawn request sent to srun");
spawn_psr_enqueue(spawn_resp->seq, fd, lrank, NULL);
spawn_req_free(spawn_req);
spawn_resp_free(spawn_resp);
debug3("mpi/pmi2: out _handle_spawn");
return rc;
}
/**************************************************/
extern int
handle_pmi2_cmd(int fd, int lrank)
{
int i, len;
char len_buf[7], *buf = NULL;
client_req_t *req = NULL;
int rc = SLURM_SUCCESS;
debug3("mpi/pmi2: in handle_pmi2_cmd");
safe_read(fd, len_buf, 6);
len_buf[6] = '\0';
len = atoi(len_buf);
buf = xmalloc(len + 1);
safe_read(fd, buf, len);
buf[len] = '\0';
debug2("mpi/pmi2: got client request: %s %s", len_buf, buf);
if (!len) {
/*
* This is an invalid request.
*
* The most likely cause of an invalid client request is a
* second PMI2_Init call from the client end. This arrives
* first as a "cmd=init" call. Ideally, we'd capture that
* request, and respond with "cmd=response_to_init" with the rc
* field set to PMI2_ERR_INIT and expect the client to cleanup
* and die correctly.
*
* However - Slurm's libpmi2 has historically ignored the rc
* value and immediately sends the FULLINIT_CMD regardless, and
* then waits for a response to that. Rather than construct
* two successive error messages, this call will send back
* "cmd=finalize-response" back that will trigger the desired
* error handling paths, and then tears down the connection
* for good measure.
*/
_handle_finalize(fd, 0, NULL);
xfree(buf);
return SLURM_ERROR;
}
req = client_req_init(len, buf);
if (req == NULL) {
error("mpi/pmi2: invalid client request");
return SLURM_ERROR;
}
i = 0;
while (pmi2_cmd_handlers[i].cmd != NULL) {
if (!xstrcmp(req->cmd, pmi2_cmd_handlers[i].cmd))
break;
i ++;
}
if (pmi2_cmd_handlers[i].cmd == NULL) {
error("mpi/pmi2: invalid pmi2 command received: '%s'", req->cmd);
rc = SLURM_ERROR;
} else {
rc = pmi2_cmd_handlers[i].handler(fd, lrank, req);
}
client_req_free(req);
debug3("mpi/pmi2: out handle_pmi2_cmd");
return rc;
rwfail:
xfree(buf);
return SLURM_ERROR;
}