| /*****************************************************************************\ |
| ** tree.c - PMI tree communication handling code |
| ***************************************************************************** |
| * Copyright (C) 2011-2012 National University of Defense Technology. |
| * Written by Hongjia Cao <hjcao@nudt.edu.cn>. |
| * All rights reserved. |
| * Portions copyright (C) 2014 Institute of Semiconductor Physics |
| * Siberian Branch of Russian Academy of Science |
| * Written by Artem Y. Polyakov <artpol84@gmail.com>. |
| * All rights reserved. |
| * Portions copyright (C) 2015 Mellanox Technologies Inc. |
| * Written by Artem Y. Polyakov <artemp@mellanox.com>. |
| * All rights reserved. |
| * |
| * This file is part of Slurm, a resource management program. |
| * For details, see <https://slurm.schedmd.com/>. |
| * Please also read the included file: DISCLAIMER. |
| * |
| * Slurm is free software; you can redistribute it and/or modify it under |
| * the terms of the GNU General Public License as published by the Free |
| * Software Foundation; either version 2 of the License, or (at your option) |
| * any later version. |
| * |
| * In addition, as a special exception, the copyright holders give permission |
| * to link the code of portions of this program with the OpenSSL library under |
| * certain conditions as described in each individual source file, and |
| * distribute linked combinations including the two. You must obey the GNU |
| * General Public License in all respects for all of the code used other than |
| * OpenSSL. If you modify file(s) with this exception, you may extend this |
| * exception to your version of the file(s), but you are not obligated to do |
| * so. If you do not wish to do so, delete this exception statement from your |
| * version. If you delete this exception statement from all source files in |
| * the program, then also delete it here. |
| * |
| * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY |
| * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
| * details. |
| * |
| * You should have received a copy of the GNU General Public License along |
| * with Slurm; if not, write to the Free Software Foundation, Inc., |
| * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| \*****************************************************************************/ |
| |
| #include <errno.h> |
| #include <stdlib.h> |
| #include <unistd.h> |
| |
| #include "src/common/slurm_xlator.h" |
| #include "src/common/slurm_protocol_interface.h" |
| #include "src/common/slurm_protocol_api.h" |
| #include "src/common/xmalloc.h" |
| |
| #include "kvs.h" |
| #include "spawn.h" |
| #include "client.h" |
| #include "setup.h" |
| #include "pmi.h" |
| #include "nameserv.h" |
| #include "ring.h" |
| |
| static int _handle_kvs_fence(int fd, Buf buf); |
| static int _handle_kvs_fence_resp(int fd, Buf buf); |
| static int _handle_spawn(int fd, Buf buf); |
| static int _handle_spawn_resp(int fd, Buf buf); |
| static int _handle_name_publish(int fd, Buf buf); |
| static int _handle_name_unpublish(int fd, Buf buf); |
| static int _handle_name_lookup(int fd, Buf buf); |
| static int _handle_ring(int fd, Buf buf); |
| static int _handle_ring_resp(int fd, Buf buf); |
| |
| static uint32_t spawned_srun_ports_size = 0; |
| static uint16_t *spawned_srun_ports = NULL; |
| |
| |
| static int (*tree_cmd_handlers[]) (int fd, Buf buf) = { |
| _handle_kvs_fence, |
| _handle_kvs_fence_resp, |
| _handle_spawn, |
| _handle_spawn_resp, |
| _handle_name_publish, |
| _handle_name_unpublish, |
| _handle_name_lookup, |
| _handle_ring, |
| _handle_ring_resp, |
| NULL |
| }; |
| |
| static char *tree_cmd_names[] = { |
| "TREE_CMD_KVS_FENCE", |
| "TREE_CMD_KVS_FENCE_RESP", |
| "TREE_CMD_SPAWN", |
| "TREE_CMD_SPAWN_RESP", |
| "TREE_CMD_NAME_PUBLISH", |
| "TREE_CMD_NAME_UNPUBLISH", |
| "TREE_CMD_NAME_LOOKUP", |
| "TREE_CMD_RING", |
| "TREE_CMD_RING_RESP", |
| NULL, |
| }; |
| |
| static int |
| _handle_kvs_fence(int fd, Buf buf) |
| { |
| uint32_t from_nodeid, num_children, temp32, seq; |
| char *from_node = NULL; |
| int rc = SLURM_SUCCESS; |
| |
| safe_unpack32(&from_nodeid, buf); |
| safe_unpackstr_xmalloc(&from_node, &temp32, buf); |
| safe_unpack32(&num_children, buf); |
| safe_unpack32(&seq, buf); |
| |
| debug3("mpi/pmi2: in _handle_kvs_fence, from node %u(%s) representing" |
| " %u offspring, seq=%u", from_nodeid, from_node, num_children, |
| seq); |
| if (seq != kvs_seq) { |
| error("mpi/pmi2: invalid kvs seq from node %u(%s) ignored, " |
| "expect %u got %u", |
| from_nodeid, from_node, kvs_seq, seq); |
| goto out; |
| } |
| if (seq == tree_info.children_kvs_seq[from_nodeid]) { |
| info("mpi/pmi2: duplicate KVS_FENCE request from node %u(%s) " |
| "ignored, seq=%u", from_nodeid, from_node, seq); |
| goto out; |
| } |
| tree_info.children_kvs_seq[from_nodeid] = seq; |
| |
| if (tasks_to_wait == 0 && children_to_wait == 0) { |
| tasks_to_wait = job_info.ltasks; |
| children_to_wait = tree_info.num_children; |
| } |
| children_to_wait -= num_children; |
| |
| temp_kvs_merge(buf); |
| |
| if ((children_to_wait == 0) && (tasks_to_wait == 0)) { |
| rc = temp_kvs_send(); |
| if (rc != SLURM_SUCCESS) { |
| if (in_stepd()) { |
| error("mpi/pmi2: failed to send temp kvs" |
| " to %s", |
| tree_info.parent_node ?: "srun"); |
| send_kvs_fence_resp_to_clients( |
| rc, |
| "mpi/pmi2: failed to send temp kvs"); |
| } else { |
| error("mpi/pmi2: failed to send temp kvs" |
| " to compute nodes"); |
| } |
| /* cancel the step to avoid tasks hang */ |
| slurm_kill_job_step(job_info.jobid, job_info.stepid, |
| SIGKILL); |
| } else { |
| if (in_stepd()) |
| waiting_kvs_resp = 1; |
| } |
| } |
| debug3("mpi/pmi2: out _handle_kvs_fence, tasks_to_wait=%d, " |
| "children_to_wait=%d", tasks_to_wait, children_to_wait); |
| out: |
| xfree(from_node); |
| return rc; |
| |
| unpack_error: |
| error("mpi/pmi2: failed to unpack kvs fence message"); |
| rc = SLURM_ERROR; |
| goto out; |
| } |
| |
| static int |
| _handle_kvs_fence_resp(int fd, Buf buf) |
| { |
| char *key, *val, *errmsg = NULL; |
| int rc = SLURM_SUCCESS; |
| uint32_t temp32, seq; |
| |
| debug3("mpi/pmi2: in _handle_kvs_fence_resp"); |
| |
| safe_unpack32(&seq, buf); |
| if (seq == kvs_seq - 2) { |
| debug("mpi/pmi2: duplicate KVS_FENCE_RESP " |
| "seq %d kvs_seq %d from srun ignored", seq, kvs_seq); |
| return rc; |
| } else if (seq != kvs_seq - 1) { |
| error("mpi/pmi2: invalid kvs seq from srun, expect %u got %u", |
| kvs_seq - 1, seq); |
| rc = SLURM_ERROR;; |
| errmsg = "mpi/pmi2: invalid kvs seq from srun"; |
| goto resp; |
| } |
| if (! waiting_kvs_resp) { |
| debug("mpi/pmi2: duplicate KVS_FENCE_RESP from srun ignored"); |
| return rc; |
| } else { |
| waiting_kvs_resp = 0; |
| } |
| |
| temp32 = remaining_buf(buf); |
| debug3("mpi/pmi2: buf length: %u", temp32); |
| /* put kvs into local hash */ |
| while (remaining_buf(buf) > 0) { |
| safe_unpackstr_xmalloc(&key, &temp32, buf); |
| safe_unpackstr_xmalloc(&val, &temp32, buf); |
| kvs_put(key, val); |
| //temp32 = remaining_buf(buf); |
| xfree(key); |
| xfree(val); |
| } |
| |
| resp: |
| send_kvs_fence_resp_to_clients(rc, errmsg); |
| if (rc != SLURM_SUCCESS) { |
| slurm_kill_job_step(job_info.jobid, job_info.stepid, SIGKILL); |
| } |
| return rc; |
| |
| unpack_error: |
| error("mpi/pmi2: unpack kvs error in fence resp"); |
| rc = SLURM_ERROR; |
| errmsg = "mpi/pmi2: unpack kvs error in fence resp"; |
| goto resp; |
| } |
| |
| /* only called in srun */ |
| static int |
| _handle_spawn(int fd, Buf buf) |
| { |
| int rc; |
| spawn_req_t *req = NULL; |
| spawn_resp_t *resp = NULL; |
| |
| debug3("mpi/pmi2: in _handle_spawn"); |
| |
| rc = spawn_req_unpack(&req, buf); |
| if (rc != SLURM_SUCCESS) { |
| error("mpi/pmi2: failed to unpack spawn request spawn cmd"); |
| /* We lack a hostname to send response below. |
| resp = spawn_resp_new(); |
| resp->rc = rc; |
| rc = spawn_resp_send_to_stepd(resp, req->from_node); |
| spawn_resp_free(resp); */ |
| return rc; |
| } |
| |
| /* assign a sequence number */ |
| req->seq = spawn_seq_next(); |
| resp = spawn_resp_new(); |
| resp->seq = req->seq; |
| resp->jobid = NULL; |
| resp->error_cnt = 0; |
| |
| /* fork srun */ |
| rc = spawn_job_do_spawn(req); |
| if (rc != SLURM_SUCCESS) { |
| error("mpi/pmi2: failed to spawn job"); |
| resp->rc = rc; |
| } else { |
| spawn_psr_enqueue(resp->seq, -1, -1, req->from_node); |
| resp->rc = SLURM_SUCCESS; /* temp resp */ |
| } |
| |
| spawn_resp_send_to_fd(resp, fd); |
| |
| spawn_req_free(req); |
| spawn_resp_free(resp); |
| |
| debug3("mpi/pmi2: out _handle_spawn"); |
| return rc; |
| } |
| |
| static int |
| _send_task_spawn_resp_pmi20(spawn_resp_t *spawn_resp, int task_fd, |
| int task_lrank) |
| { |
| int i, rc; |
| client_resp_t *task_resp; |
| char *error_codes = NULL; |
| |
| task_resp = client_resp_new(); |
| client_resp_append(task_resp, |
| CMD_KEY"="SPAWNRESP_CMD";" |
| RC_KEY"=%d;" |
| JOBID_KEY"=%s;", |
| spawn_resp->rc, |
| spawn_resp->jobid); |
| /* seems that simple2pmi does not consider rc */ |
| if (spawn_resp->rc != SLURM_SUCCESS) { |
| xstrfmtcat(error_codes, "%d", spawn_resp->rc); |
| } |
| if (spawn_resp->error_cnt > 0) { |
| if (error_codes) { |
| xstrfmtcat(error_codes, ",%d", spawn_resp->error_codes[0]); |
| } else { |
| xstrfmtcat(error_codes, "%d", spawn_resp->error_codes[0]); |
| } |
| |
| for (i = 1; i < spawn_resp->error_cnt; i ++) { |
| xstrfmtcat(error_codes, ",%d", |
| spawn_resp->error_codes[i]); |
| } |
| } |
| if (error_codes) { |
| client_resp_append(task_resp, ERRCODES_KEY"=%s;", |
| error_codes); |
| xfree(error_codes); |
| } |
| |
| rc = client_resp_send(task_resp, task_fd); |
| client_resp_free(task_resp); |
| return rc; |
| } |
| |
| static int |
| _send_task_spawn_resp_pmi11(spawn_resp_t *spawn_resp, int task_fd, |
| int task_lrank) |
| { |
| int i, rc; |
| client_resp_t *task_resp; |
| char *error_codes = NULL; |
| |
| task_resp = client_resp_new(); |
| client_resp_append(task_resp, |
| CMD_KEY"="SPAWNRESULT_CMD" " |
| RC_KEY"=%d " |
| JOBID_KEY"=%s", /* JOBID_KEY is not required */ |
| spawn_resp->rc, |
| spawn_resp->jobid); |
| |
| if (spawn_resp->rc != SLURM_SUCCESS) { |
| xstrfmtcat(error_codes, "%d", spawn_resp->rc); |
| } |
| if (spawn_resp->error_cnt > 0) { |
| if (error_codes) { |
| xstrfmtcat(error_codes, ",%d", spawn_resp->error_codes[0]); |
| } else { |
| xstrfmtcat(error_codes, "%d", spawn_resp->error_codes[0]); |
| } |
| |
| for (i = 1; i < spawn_resp->error_cnt; i ++) { |
| xstrfmtcat(error_codes, ",%d", |
| spawn_resp->error_codes[i]); |
| } |
| } |
| if (error_codes) { |
| client_resp_append(task_resp, " "ERRCODES_KEY"=%s\n", |
| error_codes); |
| xfree(error_codes); |
| } else { |
| client_resp_append(task_resp, "\n"); |
| } |
| |
| rc = client_resp_send(task_resp, task_fd); |
| client_resp_free(task_resp); |
| return rc; |
| } |
| |
| /* called in stepd and srun */ |
| static int |
| _handle_spawn_resp(int fd, Buf buf) |
| { |
| int rc, task_fd, task_lrank; |
| spawn_resp_t *spawn_resp; |
| char *from_node = NULL; |
| |
| debug3("mpi/pmi2: in _handle_spawn_resp"); |
| |
| rc = spawn_resp_unpack(&spawn_resp, buf); |
| if (rc != SLURM_SUCCESS) { |
| error("mpi/pmi2: failed to unpack spawn response tree cmd"); |
| return SLURM_ERROR; |
| } |
| |
| rc = spawn_psr_dequeue(spawn_resp->seq, &task_fd, &task_lrank, &from_node); |
| if (rc != SLURM_SUCCESS) { |
| error("mpi/pmi2: spawn response not matched in psr list"); |
| return SLURM_ERROR; |
| } |
| |
| if (from_node == NULL) { /* stepd */ |
| debug3("mpi/pmi2: spawned tasks of %s launched", |
| spawn_resp->jobid); |
| if (is_pmi20()) { |
| _send_task_spawn_resp_pmi20(spawn_resp, task_fd, task_lrank); |
| } else if (is_pmi11()) { |
| _send_task_spawn_resp_pmi11(spawn_resp, task_fd, task_lrank); |
| } |
| } else { /* srun */ |
| debug3("mpi/pmi2: spawned tasks of %s launched", |
| spawn_resp->jobid); |
| spawned_srun_ports = xrealloc(spawned_srun_ports, |
| spawn_resp->seq * |
| sizeof(uint16_t)); |
| spawned_srun_ports_size = spawn_resp->seq; /* seq start from 1 */ |
| spawned_srun_ports[spawn_resp->seq - 1] = spawn_resp->pmi_port; |
| /* forward resp to stepd */ |
| spawn_resp_send_to_stepd(spawn_resp, &from_node); |
| xfree(from_node); |
| } |
| spawn_resp_free(spawn_resp); |
| |
| return rc; |
| } |
| |
| |
| /* name serv handlers called only in srun */ |
| static int |
| _handle_name_publish(int fd, Buf buf) |
| { |
| int rc; |
| uint32_t tmp32; |
| char *name = NULL, *port = NULL; |
| Buf resp_buf = NULL; |
| |
| debug3("mpi/pmi2: in _handle_name_publish"); |
| |
| safe_unpackstr_xmalloc(&name, &tmp32, buf); |
| safe_unpackstr_xmalloc(&port, &tmp32, buf); |
| |
| if (tree_info.srun_addr) |
| rc = name_publish_up(name, port); |
| else |
| rc = name_publish_local(name, port); |
| out: |
| xfree(name); |
| xfree(port); |
| resp_buf = init_buf(32); |
| pack32((uint32_t) rc, resp_buf); |
| rc = slurm_msg_sendto(fd, get_buf_data(resp_buf), |
| get_buf_offset(resp_buf)); |
| free_buf(resp_buf); |
| |
| debug3("mpi/pmi2: out _handle_name_publish"); |
| return rc; |
| |
| unpack_error: |
| rc = SLURM_ERROR; |
| goto out; |
| } |
| |
| static int |
| _handle_name_unpublish(int fd, Buf buf) |
| { |
| int rc; |
| uint32_t tmp32; |
| char *name = NULL; |
| Buf resp_buf = NULL; |
| |
| debug3("mpi/pmi2: in _handle_name_unpublish"); |
| |
| safe_unpackstr_xmalloc(&name, &tmp32, buf); |
| |
| if (tree_info.srun_addr) |
| rc = name_unpublish_up(name); |
| else |
| rc = name_unpublish_local(name); |
| out: |
| xfree(name); |
| resp_buf = init_buf(32); |
| pack32((uint32_t) rc, resp_buf); |
| rc = slurm_msg_sendto(fd, get_buf_data(resp_buf), |
| get_buf_offset(resp_buf)); |
| free_buf(resp_buf); |
| |
| debug3("mpi/pmi2: out _handle_name_unpublish"); |
| return rc; |
| |
| unpack_error: |
| rc = SLURM_ERROR; |
| goto out; |
| } |
| |
| static int |
| _handle_name_lookup(int fd, Buf buf) |
| { |
| int rc = SLURM_SUCCESS, rc2; |
| uint32_t tmp32; |
| char *name = NULL, *port = NULL; |
| Buf resp_buf = NULL; |
| |
| debug3("mpi/pmi2: in _handle_name_lookup"); |
| |
| safe_unpackstr_xmalloc(&name, &tmp32, buf); |
| |
| if (tree_info.srun_addr) |
| port = name_lookup_up(name); |
| else |
| port = name_lookup_local(name); |
| out: |
| resp_buf = init_buf(1024); |
| packstr(port, resp_buf); |
| rc2 = slurm_msg_sendto(fd, get_buf_data(resp_buf), |
| get_buf_offset(resp_buf)); |
| rc = MAX(rc, rc2); |
| free_buf(resp_buf); |
| xfree(name); |
| xfree(port); |
| |
| debug3("mpi/pmi2: out _handle_name_lookup"); |
| return rc; |
| |
| unpack_error: |
| rc = SLURM_ERROR; |
| goto out; |
| } |
| |
| /* handles ring_in message from one of our stepd children */ |
| static int |
| _handle_ring(int fd, Buf buf) |
| { |
| uint32_t rank, count, temp32; |
| char *left = NULL; |
| char *right = NULL; |
| int ring_id; |
| int rc = SLURM_SUCCESS; |
| |
| debug3("mpi/pmi2: in _handle_ring"); |
| |
| /* TODO: do we need ntoh translation? */ |
| |
| /* data consists of: |
| * uint32_t rank - tree rank of stepd process that sent message |
| * uint32_t count - ring in count value |
| * string left - ring in left value |
| * string right - ring in right value */ |
| safe_unpack32(&rank, buf); |
| safe_unpack32(&count, buf); |
| safe_unpackstr_xmalloc(&left, &temp32, buf); |
| safe_unpackstr_xmalloc(&right, &temp32, buf); |
| |
| /* lookup ring_id for this child */ |
| ring_id = pmix_ring_id_by_rank(rank); |
| |
| /* check that we got a valid child id */ |
| if (ring_id == -1) { |
| error("mpi/pmi2: received ring_in message from unknown child %d", rank); |
| rc = SLURM_ERROR; |
| goto out; |
| } |
| |
| /* execute ring in operation */ |
| rc = pmix_ring_in(ring_id, count, left, right); |
| |
| out: |
| /* free strings unpacked from message */ |
| xfree(left); |
| xfree(right); |
| debug3("mpi/pmi2: out _handle_ring"); |
| return rc; |
| |
| unpack_error: |
| error("mpi/pmi2: failed to unpack ring in message"); |
| rc = SLURM_ERROR; |
| goto out; |
| } |
| |
| /* handles ring_out messages coming in from parent in stepd tree */ |
| static int |
| _handle_ring_resp(int fd, Buf buf) |
| { |
| uint32_t count, temp32; |
| char *left = NULL; |
| char *right = NULL; |
| int rc = SLURM_SUCCESS; |
| |
| debug3("mpi/pmi2: in _handle_ring_resp"); |
| |
| /* TODO: need ntoh translation? */ |
| /* data consists of: |
| * uint32_t count - ring out count value |
| * string left - ring out left value |
| * string right - ring out right value */ |
| safe_unpack32(&count, buf); |
| safe_unpackstr_xmalloc(&left, &temp32, buf); |
| safe_unpackstr_xmalloc(&right, &temp32, buf); |
| |
| /* execute ring out operation */ |
| rc = pmix_ring_out(count, left, right); |
| |
| out: |
| /* free strings unpacked from message */ |
| xfree(left); |
| xfree(right); |
| debug3("mpi/pmi2: out _handle_ring_resp"); |
| return rc; |
| |
| unpack_error: |
| error("mpi/pmi2: failed to unpack ring out message"); |
| rc = SLURM_ERROR; |
| goto out; |
| } |
| |
| /**************************************************************/ |
| extern int |
| handle_tree_cmd(int fd) |
| { |
| char *req_buf = NULL; |
| uint32_t len; |
| Buf buf = NULL; |
| uint16_t cmd; |
| int rc; |
| |
| debug3("mpi/pmi2: in handle_tree_cmd"); |
| |
| safe_read(fd, &len, sizeof(uint32_t)); |
| len = ntohl(len); |
| |
| safe_read(fd, &cmd, sizeof(uint16_t)); |
| cmd = ntohs(cmd); |
| if (cmd >= TREE_CMD_COUNT) { |
| error("mpi/pmi2: invalid tree req command"); |
| return SLURM_ERROR; |
| } |
| |
| len -= sizeof(cmd); |
| req_buf = xmalloc(len + 1); |
| safe_read(fd, req_buf, len); |
| buf = create_buf(req_buf, len); /* req_buf taken by buf */ |
| |
| debug3("mpi/pmi2: got tree cmd: %hu(%s)", cmd, tree_cmd_names[cmd]); |
| rc = tree_cmd_handlers[cmd](fd, buf); |
| free_buf (buf); |
| debug3("mpi/pmi2: out handle_tree_cmd"); |
| return rc; |
| |
| rwfail: |
| xfree(req_buf); |
| return SLURM_ERROR; |
| } |
| |
| extern int |
| tree_msg_to_srun(uint32_t len, char *msg) |
| { |
| int fd, rc; |
| |
| fd = slurm_open_stream(tree_info.srun_addr, true); |
| if (fd < 0) |
| return SLURM_ERROR; |
| rc = slurm_msg_sendto(fd, msg, len); |
| if (rc == len) /* all data sent */ |
| rc = SLURM_SUCCESS; |
| else |
| rc = SLURM_ERROR; |
| close(fd); |
| return rc; |
| } |
| |
| extern int |
| tree_msg_to_srun_with_resp(uint32_t len, char *msg, Buf *resp_ptr) |
| { |
| int fd, rc; |
| Buf buf = NULL; |
| char *data = NULL; |
| |
| xassert(resp_ptr != NULL); |
| |
| fd = slurm_open_stream(tree_info.srun_addr, true); |
| if (fd < 0) |
| return SLURM_ERROR; |
| rc = slurm_msg_sendto(fd, msg, len); |
| if (rc == len) { /* all data sent */ |
| safe_read(fd, &len, sizeof(len)); |
| len = ntohl(len); |
| data = xmalloc(len); |
| safe_read(fd, data, len); |
| buf = create_buf(data, len); |
| *resp_ptr = buf; |
| rc = SLURM_SUCCESS; |
| } else { |
| rc = SLURM_ERROR; |
| } |
| close(fd); |
| return rc; |
| |
| rwfail: |
| close (fd); |
| xfree(data); |
| return SLURM_ERROR; |
| } |
| |
| extern int |
| tree_msg_to_spawned_sruns(uint32_t len, char *msg) |
| { |
| int i = 0, rc = SLURM_SUCCESS, fd = -1, sent=0; |
| slurm_addr_t srun_addr; |
| |
| for (i = 0; i < spawned_srun_ports_size; i ++) { |
| if (spawned_srun_ports[i] == 0) |
| continue; |
| |
| slurm_set_addr(&srun_addr, spawned_srun_ports[i], "127.0.0.1"); |
| fd = slurm_open_stream(&srun_addr, true); |
| if (fd < 0) |
| return SLURM_ERROR; |
| sent = slurm_msg_sendto(fd, msg, len); |
| if (sent != len) |
| rc = SLURM_ERROR; |
| close(fd); |
| } |
| return rc; |
| } |