blob: eed59f081e12efc76eb7c077ff9cd86c027c7e42 [file] [log] [blame]
/*****************************************************************************\
** pmix_dconn_ucx.c - PMIx direct UCX connection
*****************************************************************************
* Copyright (C) 2017 Mellanox Technologies. All rights reserved.
* Written by Artem Polyakov <artpol84@gmail.com, artemp@mellanox.com>.
*
* This file is part of Slurm, a resource management program.
* For details, see <https://slurm.schedmd.com/>.
* Please also read the included file: DISCLAIMER.
*
* Slurm is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with Slurm; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#include "pmixp_dconn.h"
#include "pmixp_dconn_ucx.h"
#include <unistd.h>
#include <dlfcn.h>
#include <ucp/api/ucp.h>
/* local variables */
static int _service_pipe[2];
#define PMIXP_UCX_LIST_PREALLOC 16
static pmixp_list_t _free_list;
static pmixp_rlist_t _rcv_pending, _snd_pending;
static pmixp_rlist_t _rcv_complete, _snd_complete;
static int _server_fd = -1;
static bool _direct_hdr_set = false;
static pmixp_p2p_data_t _direct_hdr;
static void *_host_hdr = NULL;
pthread_mutex_t _ucx_worker_lock;
/* UCX objects */
ucp_context_h ucp_context;
ucp_worker_h ucp_worker;
static ucp_address_t *_ucx_addr;
static size_t _ucx_alen;
typedef enum {
PMIXP_UCX_ACTIVE = 0,
PMIXP_UCX_COMPLETE,
PMIXP_UCX_FAILED
} pmixp_ucx_status_t;
typedef struct {
volatile pmixp_ucx_status_t status;
void *buffer;
size_t len;
void *msg;
} pmixp_ucx_req_t;
typedef struct {
int nodeid;
bool connected;
ucp_ep_h server_ep;
void *ucx_addr;
size_t ucx_alen;
pmixp_p2p_data_t eng_hdr;
pmixp_rlist_t pending;
} pmixp_dconn_ucx_t;
static inline void _recv_req_release(pmixp_ucx_req_t *req)
{
if (req->buffer) {
xfree(req->buffer);
}
memset(req, 0, sizeof(*req));
ucp_request_release(req);
}
static void request_init(void *request)
{
pmixp_ucx_req_t *req = (pmixp_ucx_req_t *) request;
req->status = PMIXP_UCX_ACTIVE;
memset(req, 0, sizeof(*req));
}
static void send_handle(void *request, ucs_status_t status)
{
pmixp_ucx_req_t *req = (pmixp_ucx_req_t *) request;
if (UCS_OK == status){
req->status = PMIXP_UCX_COMPLETE;
} else {
PMIXP_ERROR("UCX send request failed: %s",
ucs_status_string(status));
req->status = PMIXP_UCX_FAILED;
}
}
static void recv_handle(void *request, ucs_status_t status,
ucp_tag_recv_info_t *info)
{
pmixp_ucx_req_t *req = (pmixp_ucx_req_t *) request;
if (UCS_OK == status){
req->status = PMIXP_UCX_COMPLETE;
} else {
PMIXP_ERROR("UCX send request failed: %s",
ucs_status_string(status));
req->status = PMIXP_UCX_FAILED;
}
}
static bool _epoll_readable(eio_obj_t *obj);
static int _epoll_read(eio_obj_t *obj, list_t *objs);
static struct io_operations _epoll_ops = {
.readable = _epoll_readable,
.handle_read = _epoll_read
};
static bool _progress_readable(eio_obj_t *obj);
static int _progress_read(eio_obj_t *obj, list_t *objs);
static struct io_operations _progress_ops = {
.readable = _progress_readable,
.handle_read = _progress_read
};
static void *_ucx_init(int nodeid, pmixp_p2p_data_t direct_hdr);
static void _ucx_fini(void *_priv);
static int _ucx_connect(void *_priv, void *ep_data, size_t ep_len,
void *init_msg);
static int _ucx_send(void *_priv, void *msg);
static void _ucx_regio(eio_handle_t *h);
static void *_ucx_lib_handler = NULL;
static int _load_ucx_lib()
{
/* At the time of writing this UCX doesn't support
* fork() and it's memory hooks are causing memory
* corruptions in the forked processes.
* To avoid that we need to disable memory hooks before
* loading UCX library.
*/
#if UCP_API_VERSION < UCP_VERSION(1, 5)
setenv("UCX_MEM_MMAP_RELOC", "no", 1);
#endif
setenv("UCX_MEM_MALLOC_HOOKS", "no", 1);
setenv("UCX_MEM_MALLOC_RELOC", "no", 1);
setenv("UCX_MEM_EVENTS", "no", 1);
#ifdef PMIXP_UCX_LIBPATH
/* If this Slurm build doesn't allow RPATH's
* try to open library by it's full path that
* we have from autoconf
*/
char *full_path = NULL;
xstrfmtcat(full_path, "%s/libucp.so.0", PMIXP_UCX_LIBPATH);
_ucx_lib_handler = dlopen(full_path, RTLD_LAZY | RTLD_GLOBAL);
xfree(full_path);
if (_ucx_lib_handler) {
/* successful, exit now */
return SLURM_SUCCESS;
}
/* fall-thru to see if libucp.so is located in the location
* known by dynamic linker.
*/
#endif
_ucx_lib_handler = dlopen("libucp.so.0", RTLD_LAZY | RTLD_GLOBAL);
if (!_ucx_lib_handler) {
char *err = dlerror();
PMIXP_ERROR("Cannot open UCX lib: %s", (err) ? err : "unknown");
return SLURM_ERROR;
}
return SLURM_SUCCESS;
}
static void _unload_ucx_lib()
{
xassert(_ucx_lib_handler);
if (_ucx_lib_handler) {
dlclose(_ucx_lib_handler);
}
}
int pmixp_dconn_ucx_prepare(pmixp_dconn_handlers_t *handlers,
char **ep_data, size_t *ep_len)
{
ucp_config_t *config;
ucs_status_t status;
ucp_params_t ucp_params;
ucp_worker_params_t worker_params;
/* By default UCX is not loaded until we explicitly
* asked for that
*/
_load_ucx_lib();
slurm_mutex_init(&_ucx_worker_lock);
/* container of the free elements */
pmixp_list_init(&_free_list);
/* Containers for the non-completed requests */
pmixp_rlist_init(&_rcv_pending, &_free_list, PMIXP_UCX_LIST_PREALLOC);
pmixp_rlist_init(&_snd_pending, &_free_list, PMIXP_UCX_LIST_PREALLOC);
/* Temp lists to hold completed requests for _ucx_progress */
pmixp_rlist_init(&_snd_complete, &_free_list, PMIXP_UCX_LIST_PREALLOC);
pmixp_rlist_init(&_rcv_complete, &_free_list, PMIXP_UCX_LIST_PREALLOC);
status = ucp_config_read("SLURM", NULL, &config);
if (status != UCS_OK) {
PMIXP_ERROR("Fail to read UCX config: %s",
ucs_status_string(status));
return SLURM_ERROR;
}
ucp_params.features = UCP_FEATURE_TAG | UCP_FEATURE_WAKEUP;
ucp_params.request_size = sizeof(pmixp_ucx_req_t);
ucp_params.request_init = request_init;
ucp_params.request_cleanup = NULL;
ucp_params.field_mask = UCP_PARAM_FIELD_FEATURES |
UCP_PARAM_FIELD_REQUEST_SIZE |
UCP_PARAM_FIELD_REQUEST_INIT |
UCP_PARAM_FIELD_REQUEST_CLEANUP;
status = ucp_init(&ucp_params, config, &ucp_context);
ucp_config_release(config);
if (status != UCS_OK) {
PMIXP_ERROR("Fail to init UCX: %s",
ucs_status_string(status));
return SLURM_ERROR;
}
worker_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
worker_params.thread_mode = UCS_THREAD_MODE_MULTI;
status = ucp_worker_create(ucp_context, &worker_params, &ucp_worker);
if (status != UCS_OK) {
PMIXP_ERROR("Fail to create UCX worker: %s",
ucs_status_string(status));
goto err_worker;
}
status = ucp_worker_get_address(ucp_worker, &_ucx_addr, &_ucx_alen);
if (status != UCS_OK) {
PMIXP_ERROR("Fail to get UCX address: %s",
ucs_status_string(status));
goto err_addr;
}
status = ucp_worker_get_efd(ucp_worker, &_server_fd);
if (status != UCS_OK) {
PMIXP_ERROR("Fail to get UCX epoll fd: %s",
ucs_status_string(status));
goto err_efd;
}
if (pipe(_service_pipe) != 0) {
PMIXP_ERROR("Failed to open service pipe: %m");
goto err_pipe;
}
memset(handlers, 0, sizeof(*handlers));
handlers->connect = _ucx_connect;
handlers->init = _ucx_init;
handlers->fini = _ucx_fini;
handlers->send = _ucx_send;
handlers->getio = NULL;
handlers->regio = _ucx_regio;
*ep_data = (void*)_ucx_addr;
*ep_len = (uint16_t)_ucx_alen;
return SLURM_SUCCESS;
err_pipe:
err_efd:
ucp_worker_release_address(ucp_worker, _ucx_addr);
err_addr:
ucp_worker_destroy(ucp_worker);
err_worker:
ucp_cleanup(ucp_context);
return SLURM_ERROR;
}
static void _release_send_requests(pmixp_rlist_t *l)
{
size_t count = pmixp_rlist_count(l);
size_t i;
for (i=0; i<count; i++) {
pmixp_ucx_req_t *req;
req = (pmixp_ucx_req_t*)pmixp_rlist_deq(l);
xassert(req);
ucp_request_cancel(ucp_worker, req);
if (req->buffer) {
/* NOTE: since we are finalizing, we don't really care
* about the status */
_direct_hdr.send_complete(req->msg, PMIXP_P2P_REGULAR,
SLURM_SUCCESS);
}
ucp_request_release(req);
}
}
static void _release_recv_requests(pmixp_rlist_t *l)
{
size_t count = pmixp_rlist_count(l);
size_t i;
for (i=0; i < count; i++) {
pmixp_ucx_req_t *req;
req = (pmixp_ucx_req_t*)pmixp_rlist_deq(l);
xassert(req);
ucp_request_cancel(ucp_worker, req);
_recv_req_release(req);
}
}
void pmixp_dconn_ucx_stop()
{
slurm_mutex_lock(&_ucx_worker_lock);
_release_send_requests(&_snd_pending);
_release_send_requests(&_snd_complete);
_release_recv_requests(&_rcv_pending);
_release_recv_requests(&_rcv_complete);
slurm_mutex_unlock(&_ucx_worker_lock);
}
void pmixp_dconn_ucx_finalize()
{
pmixp_list_elem_t *elem;
size_t count, i;
xassert(_direct_hdr_set);
pmixp_rlist_fini(&_snd_pending);
pmixp_rlist_fini(&_snd_complete);
pmixp_rlist_fini(&_rcv_pending);
pmixp_rlist_fini(&_rcv_complete);
/* All elements from the previous lists should settle
* down in this free list now. Release it!
*/
count = pmixp_list_count(&_free_list);
for (i=0; i < count; i++) {
elem = pmixp_list_deq(&_free_list);
pmixp_list_elem_free(elem);
}
/* cleanup UCX */
ucp_worker_destroy(ucp_worker);
ucp_cleanup(ucp_context);
/* unload UCX lib */
_unload_ucx_lib();
slurm_mutex_destroy(&_ucx_worker_lock);
}
static int _activate_progress()
{
char buf = 'c';
int rc = write(_service_pipe[1], &buf, sizeof(buf));
if (sizeof(buf) != rc) {
PMIXP_ERROR("Unable to activate UCX progress");
if (0 > rc) {
return rc;
} else {
return SLURM_ERROR;
}
}
return SLURM_SUCCESS;
}
void _ucx_process_msg(char *buffer, size_t len)
{
xassert(_direct_hdr_set);
_direct_hdr.hdr_unpack_cb(buffer, _host_hdr);
buf_t *buf = create_buf(buffer, len);
set_buf_offset(buf, _direct_hdr.rhdr_net_size);
_direct_hdr.new_msg(_host_hdr, buf);
}
static bool _ucx_progress()
{
pmixp_ucx_req_t *req = NULL;
ucp_tag_message_h msg_tag;
ucp_tag_recv_info_t info_tag;
pmixp_list_elem_t *elem;
bool new_msg = false;
size_t count, i;
int events_observed = 0;
/* protected progress of UCX */
slurm_mutex_lock(&_ucx_worker_lock);
ucp_worker_progress(ucp_worker);
/* check for new messages */
while (1) {
msg_tag = ucp_tag_probe_nb(ucp_worker, 1, 0,
1, &info_tag);
if (NULL == msg_tag) {
break;
}
events_observed++;
char *msg = xmalloc(info_tag.length);
pmixp_ucx_req_t *req = (pmixp_ucx_req_t*)
ucp_tag_msg_recv_nb(ucp_worker, (void*)msg,
info_tag.length,
ucp_dt_make_contig(1),
msg_tag, recv_handle);
if (UCS_PTR_IS_ERR(req)) {
PMIXP_ERROR("ucp_tag_msg_recv_nb failed: %s",
ucs_status_string(UCS_PTR_STATUS(req)));
continue;
}
new_msg = true;
req->buffer = msg;
req->len = info_tag.length;
if (PMIXP_UCX_ACTIVE == req->status) {
/* this message is long enough, so it makes
* sense to do the progress one more timer */
pmixp_rlist_enq(&_rcv_pending, req);
} else {
pmixp_rlist_enq(&_rcv_complete, req);
}
}
if (!new_msg && pmixp_rlist_empty(&_rcv_pending) &&
pmixp_rlist_empty(&_snd_pending)) {
goto exit;
}
/* Check pending requests */
elem = pmixp_rlist_begin(&_rcv_pending);
while (pmixp_rlist_end(&_rcv_pending) != elem) {
req = PMIXP_LIST_VAL(elem);
if (PMIXP_UCX_ACTIVE == req->status){
/* go to the next element */
elem = pmixp_rlist_next(&_rcv_pending, elem);
} else {
/* grab this element for processing */
elem = pmixp_rlist_rem(&_rcv_pending, elem);
pmixp_rlist_enq(&_rcv_complete, req);
events_observed++;
}
}
elem = pmixp_rlist_begin(&_snd_pending);
while (pmixp_rlist_end(&_snd_pending) != elem) {
req = PMIXP_LIST_VAL(elem);
if (PMIXP_UCX_ACTIVE == req->status){
/* go to the next element */
elem = pmixp_rlist_next(&_snd_pending, elem);
} else {
/* grab this element for processing */
elem = pmixp_rlist_rem(&_snd_pending, elem);
pmixp_rlist_enq(&_snd_complete, req);
events_observed++;
}
}
slurm_mutex_unlock(&_ucx_worker_lock);
/* process sends and receives unlocked */
elem = pmixp_rlist_begin(&_rcv_complete);
while (pmixp_rlist_end(&_rcv_complete) != elem) {
req = PMIXP_LIST_VAL(elem);
/* Skip failed receives
* TODO: what more can we do? */
if (PMIXP_UCX_FAILED != req->status){
_ucx_process_msg(req->buffer, req->len);
}
elem = pmixp_rlist_next(&_rcv_complete, elem);
}
elem = pmixp_rlist_begin(&_snd_complete);
while (pmixp_rlist_end(&_snd_complete) != elem) {
req = PMIXP_LIST_VAL(elem);
int rc = SLURM_SUCCESS;
if (PMIXP_UCX_FAILED == req->status){
rc = SLURM_ERROR;
}
xassert(_direct_hdr_set);
if (req->buffer) {
_direct_hdr.send_complete(req->msg,
PMIXP_P2P_REGULAR, rc);
}
elem = pmixp_rlist_next(&_snd_complete, elem);
}
slurm_mutex_lock(&_ucx_worker_lock);
count = pmixp_rlist_count(&_rcv_complete);
for (i=0; i < count; i++){
req = (pmixp_ucx_req_t *)pmixp_rlist_deq(&_rcv_complete);
/* release request to UCX */
memset(req, 0, sizeof(*req));
ucp_request_release(req);
}
count = pmixp_rlist_count(&_snd_complete);
for (i=0; i < count; i++) {
req = (pmixp_ucx_req_t *)pmixp_rlist_deq(&_snd_complete);
/* release request to UCX */
memset(req, 0, sizeof(*req));
ucp_request_release(req);
}
exit:
slurm_mutex_unlock(&_ucx_worker_lock);
return !!(events_observed);
}
static bool _epoll_readable(eio_obj_t *obj)
{
ucs_status_t status = UCS_ERR_BUSY;
/* sanity check */
xassert(NULL != obj );
if (obj->shutdown) {
/* corresponding connection will be
* cleaned up during plugin finalize
*/
return false;
}
do {
/* process all outstanding events */
while (_ucx_progress());
if (pmixp_rlist_count(&_rcv_pending) ||
pmixp_rlist_count(&_snd_pending)){
/* If we got pending requests don't wait
* on epoll, activate poll interuprtion through
* the service pipe
*/
_activate_progress();
return false;
}
/* arm the poll fd */
slurm_mutex_lock(&_ucx_worker_lock);
status = ucp_worker_arm(ucp_worker);
slurm_mutex_unlock(&_ucx_worker_lock);
} while (UCS_ERR_BUSY == status);
return true;
}
static int _epoll_read(eio_obj_t *obj, list_t *objs)
{
if (obj->shutdown) {
/* corresponding connection will be
* cleaned up during plugin finalize
*/
return 0;
}
/* process all outstanding events */
while (_ucx_progress());
return 0;
}
static bool _progress_readable(eio_obj_t *obj)
{
/* sanity check */
xassert(NULL != obj );
if (obj->shutdown) {
/* corresponding connection will be
* cleaned up during plugin finalize
*/
return false;
}
/* all the control is located in epoll_readable
* here we only say that we are readable
*/
return true;
}
static int _progress_read(eio_obj_t *obj, list_t *objs)
{
char buf;
/* sanity check */
xassert(NULL != obj );
if( obj->shutdown ){
/* corresponding connection will be
* cleaned up during plugin finalize
*/
return 0;
}
/* empty the pipe */
while (sizeof(buf) == read(_service_pipe[0], &buf, sizeof(buf)));
/* process all outstanding events */
while (_ucx_progress());
return 0;
}
static void *_ucx_init(int nodeid, pmixp_p2p_data_t direct_hdr)
{
pmixp_dconn_ucx_t *priv = xmalloc(sizeof(pmixp_dconn_ucx_t));
priv->nodeid = nodeid;
priv->connected = false;
if (!_direct_hdr_set) {
_direct_hdr = direct_hdr;
_direct_hdr_set = true;
_host_hdr = xmalloc(_direct_hdr.rhdr_host_size);
}
slurm_mutex_lock(&_ucx_worker_lock);
pmixp_rlist_init(&priv->pending, &_free_list, PMIXP_UCX_LIST_PREALLOC);
slurm_mutex_unlock(&_ucx_worker_lock);
return (void*)priv;
}
static void _ucx_fini(void *_priv)
{
pmixp_dconn_ucx_t *priv = (pmixp_dconn_ucx_t *)_priv;
if (priv->connected) {
xfree(priv->ucx_addr);
slurm_mutex_lock(&_ucx_worker_lock);
ucp_ep_destroy(priv->server_ep);
slurm_mutex_unlock(&_ucx_worker_lock);
} else {
slurm_mutex_lock(&_ucx_worker_lock);
pmixp_rlist_fini(&priv->pending);
slurm_mutex_unlock(&_ucx_worker_lock);
}
xfree(priv);
}
static int _ucx_connect(void *_priv, void *ep_data, size_t ep_len,
void *init_msg)
{
pmixp_dconn_ucx_t *priv = (pmixp_dconn_ucx_t *)_priv;
ucp_ep_params_t ep_params;
ucs_status_t status;
int rc = SLURM_SUCCESS;
size_t i, count;
priv->ucx_addr = ep_data;
priv->ucx_alen = ep_len;
/* Connect to the server */
ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
ep_params.address = priv->ucx_addr;
slurm_mutex_lock(&_ucx_worker_lock);
status = ucp_ep_create(ucp_worker, &ep_params, &priv->server_ep);
if (status != UCS_OK) {
PMIXP_ERROR("ucp_ep_create failed: %s",
ucs_status_string(status));
xfree(priv->ucx_addr);
slurm_mutex_unlock(&_ucx_worker_lock);
return SLURM_ERROR;
}
priv->connected = true;
/* Enqueue initialization message if requested */
if (init_msg) {
pmixp_rlist_push(&priv->pending, init_msg);
}
slurm_mutex_unlock(&_ucx_worker_lock);
/* we need to send data while being unlocked */
if (SLURM_SUCCESS == rc){
pmixp_list_elem_t *elem = NULL;
/* Send all pending messages */
elem = pmixp_rlist_begin(&priv->pending);
while (pmixp_rlist_end(&priv->pending) != elem) {
_ucx_send(_priv, PMIXP_LIST_VAL(elem));
elem = pmixp_rlist_next(&priv->pending, elem);
}
}
slurm_mutex_lock(&_ucx_worker_lock);
count = pmixp_rlist_count(&priv->pending);
for (i=0; i < count; i++) {
/* message is already processed, the value is
* not valid anymore.
* just dequeue from the list to ensure service
* structures cleanup
*/
(void)pmixp_rlist_deq(&priv->pending);
}
/* there will be no more pending messages, we can
* safely release pending list now
*/
pmixp_rlist_fini(&priv->pending);
slurm_mutex_unlock(&_ucx_worker_lock);
return rc;
}
static int _ucx_send(void *_priv, void *msg)
{
pmixp_dconn_ucx_t *priv = (pmixp_dconn_ucx_t *)_priv;
int rc = SLURM_SUCCESS;
bool release = false;
slurm_mutex_lock(&_ucx_worker_lock);
if (!priv->connected) {
pmixp_rlist_enq(&priv->pending, msg);
} else {
pmixp_ucx_req_t *req = NULL;
xassert(_direct_hdr_set);
char *mptr = _direct_hdr.buf_ptr(msg);
size_t msize = _direct_hdr.buf_size(msg);
req = (pmixp_ucx_req_t*)
ucp_tag_send_nb(priv->server_ep,
(void*)mptr, msize,
ucp_dt_make_contig(1),
pmixp_info_nodeid(), send_handle);
if (UCS_PTR_IS_ERR(req)) {
PMIXP_ERROR("Unable to send UCX message: %s\n",
ucs_status_string(UCS_PTR_STATUS(req)));
goto exit;
} else if (UCS_OK == UCS_PTR_STATUS(req)) {
/* defer release until we unlock ucp worker */
release = true;
} else {
req->msg = msg;
req->buffer = mptr;
req->len = msize;
pmixp_rlist_enq(&_snd_pending, (void*)req);
_activate_progress();
}
}
exit:
slurm_mutex_unlock(&_ucx_worker_lock);
if (release){
_direct_hdr.send_complete(msg, PMIXP_P2P_INLINE,
SLURM_SUCCESS);
}
return rc;
}
static void _ucx_regio(eio_handle_t *h)
{
eio_obj_t *obj;
fd_set_nonblocking(_service_pipe[0]);
fd_set_nonblocking(_service_pipe[1]);
fd_set_close_on_exec(_service_pipe[0]);
fd_set_close_on_exec(_service_pipe[1]);
obj = eio_obj_create(_service_pipe[0], &_progress_ops, (void *)(-1));
eio_new_initial_obj(h, obj);
obj = eio_obj_create(_server_fd, &_epoll_ops, (void *)(-1));
eio_new_initial_obj(h, obj);
}