/*****************************************************************************\
 **  tree.c - PMI tree communication handling code
 *****************************************************************************
 *  Copyright (C) 2011-2012 National University of Defense Technology.
 *  Written by Hongjia Cao <hjcao@nudt.edu.cn>.
 *  All rights reserved.
 *  Portions copyright (C) 2014 Institute of Semiconductor Physics
 *                     Siberian Branch of Russian Academy of Science
 *  Written by Artem Y. Polyakov <artpol84@gmail.com>.
 *  All rights reserved.
 *  Portions copyright (C) 2015 Mellanox Technologies Inc.
 *  Written by Artem Y. Polyakov <artemp@mellanox.com>.
 *  All rights reserved.
 *
 *  This file is part of Slurm, a resource management program.
 *  For details, see <https://slurm.schedmd.com/>.
 *  Please also read the included file: DISCLAIMER.
 *
 *  Slurm is free software; you can redistribute it and/or modify it under
 *  the terms of the GNU General Public License as published by the Free
 *  Software Foundation; either version 2 of the License, or (at your option)
 *  any later version.
 *
 *  In addition, as a special exception, the copyright holders give permission
 *  to link the code of portions of this program with the OpenSSL library under
 *  certain conditions as described in each individual source file, and
 *  distribute linked combinations including the two. You must obey the GNU
 *  General Public License in all respects for all of the code used other than
 *  OpenSSL. If you modify file(s) with this exception, you may extend this
 *  exception to your version of the file(s), but you are not obligated to do
 *  so. If you do not wish to do so, delete this exception statement from your
 *  version.  If you delete this exception statement from all source files in
 *  the program, then also delete it here.
 *
 *  Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
 *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
 *  details.
 *
 *  You should have received a copy of the GNU General Public License along
 *  with Slurm; if not, write to the Free Software Foundation, Inc.,
 *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA.
\*****************************************************************************/

#include <errno.h>
#include <signal.h>
#include <stdlib.h>
#include <unistd.h>

#include "src/common/slurm_xlator.h"
#include "src/common/slurm_protocol_interface.h"
#include "src/common/slurm_protocol_api.h"
#include "src/common/xmalloc.h"

#include "kvs.h"
#include "spawn.h"
#include "client.h"
#include "setup.h"
#include "pmi.h"
#include "nameserv.h"
#include "ring.h"

static int _handle_kvs_fence(int fd, buf_t *buf);
static int _handle_kvs_fence_resp(int fd, buf_t *buf);
static int _handle_spawn(int fd, buf_t *buf);
static int _handle_spawn_resp(int fd, buf_t *buf);
static int _handle_name_publish(int fd, buf_t *buf);
static int _handle_name_unpublish(int fd, buf_t *buf);
static int _handle_name_lookup(int fd, buf_t *buf);
static int _handle_ring(int fd, buf_t *buf);
static int _handle_ring_resp(int fd, buf_t *buf);

static uint32_t  spawned_srun_ports_size = 0;
static uint16_t *spawned_srun_ports = NULL;


static int (*tree_cmd_handlers[]) (int fd, buf_t *buf) = {
	_handle_kvs_fence,
	_handle_kvs_fence_resp,
	_handle_spawn,
	_handle_spawn_resp,
	_handle_name_publish,
	_handle_name_unpublish,
	_handle_name_lookup,
	_handle_ring,
	_handle_ring_resp,
	NULL
};

static char *tree_cmd_names[] = {
	"TREE_CMD_KVS_FENCE",
	"TREE_CMD_KVS_FENCE_RESP",
	"TREE_CMD_SPAWN",
	"TREE_CMD_SPAWN_RESP",
	"TREE_CMD_NAME_PUBLISH",
	"TREE_CMD_NAME_UNPUBLISH",
	"TREE_CMD_NAME_LOOKUP",
	"TREE_CMD_RING",
	"TREE_CMD_RING_RESP",
	NULL,
};

static int _handle_kvs_fence(int fd, buf_t *buf)
{
	uint32_t from_nodeid, num_children, temp32, seq;
	char *from_node = NULL;
	int rc = SLURM_SUCCESS;

	safe_unpack32(&from_nodeid, buf);
	safe_unpackstr_xmalloc(&from_node, &temp32, buf);
	safe_unpack32(&num_children, buf);
	safe_unpack32(&seq, buf);

	debug3("mpi/pmi2: in _handle_kvs_fence, from node %u(%s) representing"
	       " %u offspring, seq=%u", from_nodeid, from_node, num_children,
	       seq);
	if (seq != kvs_seq) {
		error("mpi/pmi2: invalid kvs seq from node %u(%s) ignored, "
		      "expect %u got %u",
		      from_nodeid, from_node, kvs_seq, seq);
		goto out;
	}
	if (seq == tree_info.children_kvs_seq[from_nodeid]) {
		info("mpi/pmi2: duplicate KVS_FENCE request from node %u(%s) "
		      "ignored, seq=%u", from_nodeid, from_node, seq);
		goto out;
	}
	tree_info.children_kvs_seq[from_nodeid] = seq;

	if (tasks_to_wait == 0 && children_to_wait == 0) {
		tasks_to_wait = job_info.ltasks;
		children_to_wait = tree_info.num_children;
	}
	children_to_wait -= num_children;

	temp_kvs_merge(buf);

	if ((children_to_wait == 0) && (tasks_to_wait == 0)) {
		rc = temp_kvs_send();
		if (rc != SLURM_SUCCESS) {
			if (in_stepd()) {
				error("mpi/pmi2: failed to send temp kvs"
				      " to %s",
				      tree_info.parent_node ?: "srun");
				send_kvs_fence_resp_to_clients(
					rc,
					"mpi/pmi2: failed to send temp kvs");
			} else {
				error("mpi/pmi2: failed to send temp kvs"
				      " to compute nodes");
			}
			/* cancel the step to avoid tasks hang */
			slurm_kill_job_step(job_info.step_id.job_id,
					    job_info.step_id.step_id,
					    SIGKILL, 0);
		} else {
			if (in_stepd())
				waiting_kvs_resp = 1;
		}
	}
	debug3("mpi/pmi2: out _handle_kvs_fence, tasks_to_wait=%d, "
	       "children_to_wait=%d", tasks_to_wait, children_to_wait);
out:
	xfree(from_node);
	return rc;

unpack_error:
	error("mpi/pmi2: failed to unpack kvs fence message");
	rc = SLURM_ERROR;
	goto out;
}

static int _handle_kvs_fence_resp(int fd, buf_t *buf)
{
	char *key, *val, *errmsg = NULL;
	int rc = SLURM_SUCCESS;
	uint32_t temp32, seq;

	debug3("mpi/pmi2: in _handle_kvs_fence_resp");

	safe_unpack32(&seq, buf);
	if (seq == kvs_seq - 2) {
		debug("mpi/pmi2: duplicate KVS_FENCE_RESP "
		      "seq %d kvs_seq %d from srun ignored", seq, kvs_seq);
		return rc;
	} else if (seq != kvs_seq - 1) {
		error("mpi/pmi2: invalid kvs seq from srun, expect %u got %u",
		      kvs_seq - 1, seq);
		rc = SLURM_ERROR;;
		errmsg = "mpi/pmi2: invalid kvs seq from srun";
		goto resp;
	}
	if (! waiting_kvs_resp) {
		debug("mpi/pmi2: duplicate KVS_FENCE_RESP from srun ignored");
		return rc;
	} else {
		waiting_kvs_resp = 0;
	}

	temp32 = remaining_buf(buf);
	debug3("mpi/pmi2: buf length: %u", temp32);
	/* put kvs into local hash */
	while (remaining_buf(buf) > 0) {
		safe_unpackstr_xmalloc(&key, &temp32, buf);
		safe_unpackstr_xmalloc(&val, &temp32, buf);
		kvs_put(key, val);
		//temp32 = remaining_buf(buf);
		xfree(key);
		xfree(val);
	}

resp:
	send_kvs_fence_resp_to_clients(rc, errmsg);
	if (rc != SLURM_SUCCESS) {
		slurm_kill_job_step(job_info.step_id.job_id,
				    job_info.step_id.step_id, SIGKILL, 0);
	}
	return rc;

unpack_error:
	error("mpi/pmi2: unpack kvs error in fence resp");
	rc = SLURM_ERROR;
	errmsg = "mpi/pmi2: unpack kvs error in fence resp";
	goto resp;
}

/* only called in srun */
static int _handle_spawn(int fd, buf_t *buf)
{
	int rc;
	spawn_req_t *req = NULL;
	spawn_resp_t *resp = NULL;

	debug3("mpi/pmi2: in _handle_spawn");

	rc = spawn_req_unpack(&req, buf);
	if (rc != SLURM_SUCCESS) {
		error("mpi/pmi2: failed to unpack spawn request spawn cmd");
		/* We lack a hostname to send response below.
		 resp = spawn_resp_new();
		resp->rc = rc;
		rc = spawn_resp_send_to_stepd(resp, req->from_node);
		spawn_resp_free(resp); */
		return rc;
	}

	/* assign a sequence number */
	req->seq = spawn_seq_next();
	resp = spawn_resp_new();
	resp->seq = req->seq;
	resp->jobid = NULL;
	resp->error_cnt = 0;

	/* fork srun */
	rc = spawn_job_do_spawn(req);
	if (rc != SLURM_SUCCESS) {
		error("mpi/pmi2: failed to spawn job");
		resp->rc = rc;
	} else {
		spawn_psr_enqueue(resp->seq, -1, -1, req->from_node);
		resp->rc = SLURM_SUCCESS; /* temp resp */
	}

	spawn_resp_send_to_fd(resp, fd);

	spawn_req_free(req);
	spawn_resp_free(resp);

	debug3("mpi/pmi2: out _handle_spawn");
	return rc;
}

static int
_send_task_spawn_resp_pmi20(spawn_resp_t *spawn_resp, int task_fd,
			    int task_lrank)
{
	int i, rc;
	client_resp_t *task_resp;
	char *error_codes = NULL;

	task_resp = client_resp_new();
	client_resp_append(task_resp,
			   CMD_KEY"="SPAWNRESP_CMD";"
			   RC_KEY"=%d;"
			   JOBID_KEY"=%s;",
			   spawn_resp->rc,
			   spawn_resp->jobid);
	/* seems that simple2pmi does not consider rc */
	if (spawn_resp->rc != SLURM_SUCCESS) {
		xstrfmtcat(error_codes, "%d", spawn_resp->rc);
	}
	if (spawn_resp->error_cnt > 0) {
		if (error_codes) {
			xstrfmtcat(error_codes, ",%d", spawn_resp->error_codes[0]);
		} else {
			xstrfmtcat(error_codes, "%d", spawn_resp->error_codes[0]);
		}

		for (i = 1; i < spawn_resp->error_cnt; i ++) {
			xstrfmtcat(error_codes, ",%d",
				   spawn_resp->error_codes[i]);
		}
	}
	if (error_codes) {
		client_resp_append(task_resp, ERRCODES_KEY"=%s;",
				   error_codes);
		xfree(error_codes);
	}

	rc = client_resp_send(task_resp, task_fd);
	client_resp_free(task_resp);
	return rc;
}

static int
_send_task_spawn_resp_pmi11(spawn_resp_t *spawn_resp, int task_fd,
			    int task_lrank)
{
	int i, rc;
	client_resp_t *task_resp;
	char *error_codes = NULL;

	task_resp = client_resp_new();
	client_resp_append(task_resp,
			   CMD_KEY"="SPAWNRESULT_CMD" "
			   RC_KEY"=%d "
			   JOBID_KEY"=%s", /* JOBID_KEY is not required */
			   spawn_resp->rc,
			   spawn_resp->jobid);

	if (spawn_resp->rc != SLURM_SUCCESS) {
		xstrfmtcat(error_codes, "%d", spawn_resp->rc);
	}
	if (spawn_resp->error_cnt > 0) {
		if (error_codes) {
			xstrfmtcat(error_codes, ",%d", spawn_resp->error_codes[0]);
		} else {
			xstrfmtcat(error_codes, "%d", spawn_resp->error_codes[0]);
		}

		for (i = 1; i < spawn_resp->error_cnt; i ++) {
			xstrfmtcat(error_codes, ",%d",
				   spawn_resp->error_codes[i]);
		}
	}
	if (error_codes) {
		client_resp_append(task_resp, " "ERRCODES_KEY"=%s\n",
				   error_codes);
		xfree(error_codes);
	} else {
		client_resp_append(task_resp, "\n");
	}

	rc = client_resp_send(task_resp, task_fd);
	client_resp_free(task_resp);
	return rc;
}

/* called in stepd and srun */
static int _handle_spawn_resp(int fd, buf_t *buf)
{
	int rc, task_fd, task_lrank;
	spawn_resp_t *spawn_resp;
	char *from_node = NULL;

	debug3("mpi/pmi2: in _handle_spawn_resp");

	rc = spawn_resp_unpack(&spawn_resp, buf);
	if (rc != SLURM_SUCCESS) {
		error("mpi/pmi2: failed to unpack spawn response tree cmd");
		return SLURM_ERROR;
	}

	rc = spawn_psr_dequeue(spawn_resp->seq, &task_fd, &task_lrank, &from_node);
	if (rc != SLURM_SUCCESS) {
		error("mpi/pmi2: spawn response not matched in psr list");
		return SLURM_ERROR;
	}

	if (from_node == NULL) { /* stepd */
		debug3("mpi/pmi2: spawned tasks of %s launched",
		       spawn_resp->jobid);
		if (is_pmi20()) {
			_send_task_spawn_resp_pmi20(spawn_resp, task_fd, task_lrank);
		} else if (is_pmi11()) {
			_send_task_spawn_resp_pmi11(spawn_resp, task_fd, task_lrank);
		}
	} else {		/* srun */
		debug3("mpi/pmi2: spawned tasks of %s launched",
		       spawn_resp->jobid);
		spawned_srun_ports = xrealloc(spawned_srun_ports,
					      spawn_resp->seq *
					      sizeof(uint16_t));
		spawned_srun_ports_size = spawn_resp->seq; /* seq start from 1 */
		spawned_srun_ports[spawn_resp->seq - 1] = spawn_resp->pmi_port;
		/* forward resp to stepd */
		spawn_resp_send_to_stepd(spawn_resp, &from_node);
		xfree(from_node);
	}
	spawn_resp_free(spawn_resp);

	return rc;
}


/* name serv handlers called only in srun */
static int _handle_name_publish(int fd, buf_t *buf)
{
	int rc;
	uint32_t tmp32;
	char *name = NULL, *port = NULL;
	buf_t *resp_buf = NULL;

	debug3("mpi/pmi2: in _handle_name_publish");

	safe_unpackstr_xmalloc(&name, &tmp32, buf);
	safe_unpackstr_xmalloc(&port, &tmp32, buf);

	if (tree_info.srun_addr)
		rc = name_publish_up(name, port);
	else
		rc = name_publish_local(name, port);
out:
	xfree(name);
	xfree(port);
	resp_buf = init_buf(32);
	pack32((uint32_t) rc, resp_buf);
	rc = slurm_msg_sendto(fd, get_buf_data(resp_buf),
			      get_buf_offset(resp_buf));
	FREE_NULL_BUFFER(resp_buf);

	debug3("mpi/pmi2: out _handle_name_publish");
	return rc;

unpack_error:
	rc = SLURM_ERROR;
	goto out;
}

static int _handle_name_unpublish(int fd, buf_t *buf)
{
	int rc;
	uint32_t tmp32;
	char *name = NULL;
	buf_t *resp_buf = NULL;

	debug3("mpi/pmi2: in _handle_name_unpublish");

	safe_unpackstr_xmalloc(&name, &tmp32, buf);

	if (tree_info.srun_addr)
		rc = name_unpublish_up(name);
	else
		rc = name_unpublish_local(name);
out:
	xfree(name);
	resp_buf = init_buf(32);
	pack32((uint32_t) rc, resp_buf);
	rc = slurm_msg_sendto(fd, get_buf_data(resp_buf),
			      get_buf_offset(resp_buf));
	FREE_NULL_BUFFER(resp_buf);

	debug3("mpi/pmi2: out _handle_name_unpublish");
	return rc;

unpack_error:
	rc = SLURM_ERROR;
	goto out;
}

static int _handle_name_lookup(int fd, buf_t *buf)
{
	int rc = SLURM_SUCCESS, rc2;
	uint32_t tmp32;
	char *name = NULL, *port = NULL;
	buf_t *resp_buf = NULL;

	debug3("mpi/pmi2: in _handle_name_lookup");

	safe_unpackstr_xmalloc(&name, &tmp32, buf);

	if (tree_info.srun_addr)
		port = name_lookup_up(name);
	else
		port = name_lookup_local(name);
out:
	resp_buf = init_buf(1024);
	packstr(port, resp_buf);
	rc2 = slurm_msg_sendto(fd, get_buf_data(resp_buf),
			       get_buf_offset(resp_buf));
	rc = MAX(rc, rc2);
	FREE_NULL_BUFFER(resp_buf);
	xfree(name);
	xfree(port);

	debug3("mpi/pmi2: out _handle_name_lookup");
	return rc;

unpack_error:
	rc = SLURM_ERROR;
	goto out;
}

/* handles ring_in message from one of our stepd children */
static int _handle_ring(int fd, buf_t *buf)
{
	uint32_t rank, count, temp32;
	char *left  = NULL;
	char *right = NULL;
	int ring_id;
	int rc = SLURM_SUCCESS;

        debug3("mpi/pmi2: in _handle_ring");

	/* TODO: do we need ntoh translation? */

	/* data consists of:
         *   uint32_t rank  - tree rank of stepd process that sent message
         *   uint32_t count - ring in count value
         *   string   left  - ring in left value
         *   string   right - ring in right value */
	safe_unpack32(&rank,  buf);
	safe_unpack32(&count, buf);
	safe_unpackstr_xmalloc(&left,  &temp32, buf);
	safe_unpackstr_xmalloc(&right, &temp32, buf);

	/* lookup ring_id for this child */
	ring_id = pmix_ring_id_by_rank(rank);

	/* check that we got a valid child id */
	if (ring_id == -1) {
		error("mpi/pmi2: received ring_in message from unknown child %d", rank);
		rc = SLURM_ERROR;
		goto out;
	}

	/* execute ring in operation */
	rc = pmix_ring_in(ring_id, count, left, right);

out:
	/* free strings unpacked from message */
	xfree(left);
	xfree(right);
        debug3("mpi/pmi2: out _handle_ring");
	return rc;

unpack_error:
	error("mpi/pmi2: failed to unpack ring in message");
	rc = SLURM_ERROR;
	goto out;
}

/* handles ring_out messages coming in from parent in stepd tree */
static int _handle_ring_resp(int fd, buf_t *buf)
{
	uint32_t count, temp32;
	char *left  = NULL;
	char *right = NULL;
	int rc = SLURM_SUCCESS;

        debug3("mpi/pmi2: in _handle_ring_resp");

	/* TODO: need ntoh translation? */
	/* data consists of:
         *   uint32_t count - ring out count value
         *   string   left  - ring out left value
         *   string   right - ring out right value */
	safe_unpack32(&count, buf);
	safe_unpackstr_xmalloc(&left,  &temp32, buf);
	safe_unpackstr_xmalloc(&right, &temp32, buf);

	/* execute ring out operation */
	rc = pmix_ring_out(count, left, right);

out:
	/* free strings unpacked from message */
	xfree(left);
	xfree(right);
        debug3("mpi/pmi2: out _handle_ring_resp");
	return rc;

unpack_error:
	error("mpi/pmi2: failed to unpack ring out message");
	rc = SLURM_ERROR;
	goto out;
}

/**************************************************************/
extern int
handle_tree_cmd(int fd)
{
	char *req_buf = NULL;
	uint32_t len;
	buf_t *buf = NULL;
	uint16_t cmd;
	int rc;

	debug3("mpi/pmi2: in handle_tree_cmd");

	safe_read(fd, &len, sizeof(uint32_t));
	len = ntohl(len);

	safe_read(fd, &cmd, sizeof(uint16_t));
	cmd = ntohs(cmd);
	if (cmd >= TREE_CMD_COUNT) {
		error("mpi/pmi2: invalid tree req command");
		return SLURM_ERROR;
	}

	len -= sizeof(cmd);
	req_buf = xmalloc(len + 1);
	safe_read(fd, req_buf, len);
	buf = create_buf(req_buf, len); /* req_buf taken by buf */

	debug3("mpi/pmi2: got tree cmd: %hu(%s)", cmd, tree_cmd_names[cmd]);
	rc = tree_cmd_handlers[cmd](fd, buf);
	FREE_NULL_BUFFER(buf);
	debug3("mpi/pmi2: out handle_tree_cmd");
	return rc;

rwfail:
	xfree(req_buf);
	return SLURM_ERROR;
}

extern int
tree_msg_to_srun(uint32_t len, char *msg)
{
	int fd, rc;

	fd = slurm_open_stream(tree_info.srun_addr, true);
	if (fd < 0)
		return SLURM_ERROR;
	rc = slurm_msg_sendto(fd, msg, len);
	if (rc == len) /* all data sent */
		rc = SLURM_SUCCESS;
	else
		rc = SLURM_ERROR;
	close(fd);
	return rc;
}

extern int tree_msg_to_srun_with_resp(uint32_t len, char *msg, buf_t **resp_ptr)
{
	int fd, rc;
	buf_t *buf = NULL;
	char *data = NULL;

	xassert(resp_ptr != NULL);

	fd = slurm_open_stream(tree_info.srun_addr, true);
	if (fd < 0)
		return SLURM_ERROR;
	rc = slurm_msg_sendto(fd, msg, len);
	if (rc == len) { 	/* all data sent */
		safe_read(fd, &len, sizeof(len));
		len = ntohl(len);
		data = xmalloc(len);
		safe_read(fd, data, len);
		buf = create_buf(data, len);
		*resp_ptr = buf;
		rc = SLURM_SUCCESS;
	} else {
		rc = SLURM_ERROR;
	}
	close(fd);
	return rc;

rwfail:
	close (fd);
	xfree(data);
	return SLURM_ERROR;
}

extern int
tree_msg_to_spawned_sruns(uint32_t len, char *msg)
{
	int i = 0, rc = SLURM_SUCCESS, fd = -1, sent=0;
	slurm_addr_t srun_addr;

	for (i = 0; i < spawned_srun_ports_size; i ++) {
		if (spawned_srun_ports[i] == 0)
			continue;

		slurm_set_addr(&srun_addr, spawned_srun_ports[i], "127.0.0.1");
		fd = slurm_open_stream(&srun_addr, true);
		if (fd < 0)
			return SLURM_ERROR;
		sent = slurm_msg_sendto(fd, msg, len);
		if (sent != len)
			rc = SLURM_ERROR;
		close(fd);
	}
	return rc;
}
