| /* |
| * Copyright (C) 2010 Red Hat, Inc. |
| * |
| * Author: Angus Salkeld <asalkeld@redhat.com> |
| * |
| * This file is part of libqb. |
| * |
| * libqb is free software: you can redistribute it and/or modify |
| * it under the terms of the GNU Lesser General Public License as published by |
| * the Free Software Foundation, either version 2.1 of the License, or |
| * (at your option) any later version. |
| * |
| * libqb is distributed in the hope that it will be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| * GNU Lesser General Public License for more details. |
| * |
| * You should have received a copy of the GNU Lesser General Public License |
| * along with libqb. If not, see <http://www.gnu.org/licenses/>. |
| */ |
| #include "os_base.h" |
| #include <poll.h> |
| |
| #include "util_int.h" |
| #include "ipc_int.h" |
| #include <qb/qbdefs.h> |
| #include <qb/qbatomic.h> |
| #include <qb/qbipcs.h> |
| |
| static void qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c, |
| int32_t fc_enable); |
| static int32_t |
| new_event_notification(struct qb_ipcs_connection * c); |
| |
| static QB_LIST_DECLARE(qb_ipc_services); |
| |
| qb_ipcs_service_t * |
| qb_ipcs_create(const char *name, |
| int32_t service_id, |
| enum qb_ipc_type type, struct qb_ipcs_service_handlers *handlers) |
| { |
| struct qb_ipcs_service *s; |
| |
| s = calloc(1, sizeof(struct qb_ipcs_service)); |
| if (s == NULL) { |
| return NULL; |
| } |
| if (type == QB_IPC_NATIVE) { |
| #ifdef DISABLE_IPC_SHM |
| s->type = QB_IPC_SOCKET; |
| #else |
| s->type = QB_IPC_SHM; |
| #endif /* DISABLE_IPC_SHM */ |
| } else { |
| s->type = type; |
| } |
| |
| s->pid = getpid(); |
| s->needs_sock_for_poll = QB_FALSE; |
| s->poll_priority = QB_LOOP_MED; |
| |
| /* Initial alloc ref */ |
| qb_ipcs_ref(s); |
| |
| s->service_id = service_id; |
| (void)strlcpy(s->name, name, NAME_MAX); |
| |
| s->serv_fns.connection_accept = handlers->connection_accept; |
| s->serv_fns.connection_created = handlers->connection_created; |
| s->serv_fns.msg_process = handlers->msg_process; |
| s->serv_fns.connection_closed = handlers->connection_closed; |
| s->serv_fns.connection_destroyed = handlers->connection_destroyed; |
| |
| qb_list_init(&s->connections); |
| qb_list_init(&s->list); |
| qb_list_add(&s->list, &qb_ipc_services); |
| |
| return s; |
| } |
| |
| void |
| qb_ipcs_poll_handlers_set(struct qb_ipcs_service *s, |
| struct qb_ipcs_poll_handlers *handlers) |
| { |
| s->poll_fns.job_add = handlers->job_add; |
| s->poll_fns.dispatch_add = handlers->dispatch_add; |
| s->poll_fns.dispatch_mod = handlers->dispatch_mod; |
| s->poll_fns.dispatch_del = handlers->dispatch_del; |
| } |
| |
| void |
| qb_ipcs_service_context_set(qb_ipcs_service_t* s, |
| void *context) |
| { |
| s->context = context; |
| } |
| |
| void * |
| qb_ipcs_service_context_get(qb_ipcs_service_t* s) |
| { |
| return s->context; |
| } |
| |
| int32_t |
| qb_ipcs_run(struct qb_ipcs_service *s) |
| { |
| int32_t res = 0; |
| |
| if (s->poll_fns.dispatch_add == NULL || |
| s->poll_fns.dispatch_mod == NULL || |
| s->poll_fns.dispatch_del == NULL) { |
| |
| res = -EINVAL; |
| goto run_cleanup; |
| } |
| |
| switch (s->type) { |
| case QB_IPC_SOCKET: |
| qb_ipcs_us_init((struct qb_ipcs_service *)s); |
| break; |
| case QB_IPC_SHM: |
| #ifdef DISABLE_IPC_SHM |
| res = -ENOTSUP; |
| #else |
| qb_ipcs_shm_init((struct qb_ipcs_service *)s); |
| #endif /* DISABLE_IPC_SHM */ |
| break; |
| case QB_IPC_POSIX_MQ: |
| case QB_IPC_SYSV_MQ: |
| res = -ENOTSUP; |
| break; |
| default: |
| res = -EINVAL; |
| break; |
| } |
| |
| if (res == 0) { |
| res = qb_ipcs_us_publish(s); |
| if (res < 0) { |
| (void)qb_ipcs_us_withdraw(s); |
| goto run_cleanup; |
| } |
| } |
| |
| run_cleanup: |
| if (res < 0) { |
| /* Failed to run services, removing initial alloc reference. */ |
| qb_ipcs_unref(s); |
| } |
| |
| return res; |
| } |
| |
| static int32_t |
| _modify_dispatch_descriptor_(struct qb_ipcs_connection *c) |
| { |
| qb_ipcs_dispatch_mod_fn disp_mod = c->service->poll_fns.dispatch_mod; |
| |
| if (c->service->type == QB_IPC_SOCKET) { |
| return disp_mod(c->service->poll_priority, |
| c->event.u.us.sock, |
| c->poll_events, c, |
| qb_ipcs_dispatch_connection_request); |
| } else { |
| return disp_mod(c->service->poll_priority, |
| c->setup.u.us.sock, |
| c->poll_events, c, |
| qb_ipcs_dispatch_connection_request); |
| } |
| return -EINVAL; |
| } |
| |
| void |
| qb_ipcs_request_rate_limit(struct qb_ipcs_service *s, |
| enum qb_ipcs_rate_limit rl) |
| { |
| struct qb_ipcs_connection *c; |
| enum qb_loop_priority old_p = s->poll_priority; |
| struct qb_list_head *pos; |
| struct qb_list_head *n; |
| |
| switch (rl) { |
| case QB_IPCS_RATE_FAST: |
| s->poll_priority = QB_LOOP_HIGH; |
| break; |
| case QB_IPCS_RATE_SLOW: |
| case QB_IPCS_RATE_OFF: |
| case QB_IPCS_RATE_OFF_2: |
| s->poll_priority = QB_LOOP_LOW; |
| break; |
| default: |
| case QB_IPCS_RATE_NORMAL: |
| s->poll_priority = QB_LOOP_MED; |
| break; |
| } |
| |
| qb_list_for_each_safe(pos, n, &s->connections) { |
| |
| c = qb_list_entry(pos, struct qb_ipcs_connection, list); |
| qb_ipcs_connection_ref(c); |
| |
| if (rl == QB_IPCS_RATE_OFF) { |
| qb_ipcs_flowcontrol_set(c, 1); |
| } else if (rl == QB_IPCS_RATE_OFF_2) { |
| qb_ipcs_flowcontrol_set(c, 2); |
| } else { |
| qb_ipcs_flowcontrol_set(c, QB_FALSE); |
| } |
| if (old_p != s->poll_priority) { |
| (void)_modify_dispatch_descriptor_(c); |
| } |
| qb_ipcs_connection_unref(c); |
| } |
| } |
| |
| void |
| qb_ipcs_ref(struct qb_ipcs_service *s) |
| { |
| qb_atomic_int_inc(&s->ref_count); |
| } |
| |
| void |
| qb_ipcs_unref(struct qb_ipcs_service *s) |
| { |
| int32_t free_it; |
| |
| assert(s->ref_count > 0); |
| free_it = qb_atomic_int_dec_and_test(&s->ref_count); |
| if (free_it) { |
| qb_util_log(LOG_DEBUG, "%s() - destroying", __func__); |
| free(s); |
| } |
| } |
| |
| void |
| qb_ipcs_destroy(struct qb_ipcs_service *s) |
| { |
| struct qb_ipcs_connection *c = NULL; |
| struct qb_list_head *pos; |
| struct qb_list_head *n; |
| |
| if (s == NULL) { |
| return; |
| } |
| qb_list_for_each_safe(pos, n, &s->connections) { |
| c = qb_list_entry(pos, struct qb_ipcs_connection, list); |
| if (c == NULL) { |
| continue; |
| } |
| qb_ipcs_disconnect(c); |
| } |
| (void)qb_ipcs_us_withdraw(s); |
| |
| /* service destroyed, remove initial alloc ref */ |
| qb_ipcs_unref(s); |
| } |
| |
| /* |
| * connection API |
| */ |
| static struct qb_ipc_one_way * |
| _event_sock_one_way_get(struct qb_ipcs_connection * c) |
| { |
| if (c->service->needs_sock_for_poll) { |
| return &c->setup; |
| } |
| if (c->event.type == QB_IPC_SOCKET) { |
| return &c->event; |
| } |
| return NULL; |
| } |
| |
| static struct qb_ipc_one_way * |
| _response_sock_one_way_get(struct qb_ipcs_connection * c) |
| { |
| if (c->service->needs_sock_for_poll) { |
| return &c->setup; |
| } |
| if (c->response.type == QB_IPC_SOCKET) { |
| return &c->response; |
| } |
| return NULL; |
| } |
| |
| ssize_t |
| qb_ipcs_response_send(struct qb_ipcs_connection *c, const void *data, |
| size_t size) |
| { |
| ssize_t res; |
| |
| if (c == NULL) { |
| return -EINVAL; |
| } |
| qb_ipcs_connection_ref(c); |
| res = c->service->funcs.send(&c->response, data, size); |
| if (res == size) { |
| c->stats.responses++; |
| } else if (res == -EAGAIN || res == -ETIMEDOUT) { |
| struct qb_ipc_one_way *ow = _response_sock_one_way_get(c); |
| if (ow) { |
| ssize_t res2 = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT); |
| if (res2 < 0) { |
| res = res2; |
| } |
| } |
| c->stats.send_retries++; |
| } |
| qb_ipcs_connection_unref(c); |
| |
| return res; |
| } |
| |
| ssize_t |
| qb_ipcs_response_sendv(struct qb_ipcs_connection * c, const struct iovec * iov, |
| size_t iov_len) |
| { |
| ssize_t res; |
| |
| if (c == NULL) { |
| return -EINVAL; |
| } |
| qb_ipcs_connection_ref(c); |
| res = c->service->funcs.sendv(&c->response, iov, iov_len); |
| if (res > 0) { |
| c->stats.responses++; |
| } else if (res == -EAGAIN || res == -ETIMEDOUT) { |
| struct qb_ipc_one_way *ow = _response_sock_one_way_get(c); |
| if (ow) { |
| ssize_t res2 = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT); |
| if (res2 < 0) { |
| res = res2; |
| } |
| } |
| c->stats.send_retries++; |
| } |
| qb_ipcs_connection_unref(c); |
| |
| return res; |
| } |
| |
| static int32_t |
| resend_event_notifications(struct qb_ipcs_connection *c) |
| { |
| ssize_t res = 0; |
| |
| if (!c->service->needs_sock_for_poll) { |
| return res; |
| } |
| |
| if (c->outstanding_notifiers > 0) { |
| res = qb_ipc_us_send(&c->setup, c->receive_buf, |
| c->outstanding_notifiers); |
| } |
| if (res > 0) { |
| c->outstanding_notifiers -= res; |
| } |
| |
| assert(c->outstanding_notifiers >= 0); |
| if (c->outstanding_notifiers == 0) { |
| c->poll_events = POLLIN | POLLPRI | POLLNVAL; |
| (void)_modify_dispatch_descriptor_(c); |
| } |
| return res; |
| } |
| |
| static int32_t |
| new_event_notification(struct qb_ipcs_connection * c) |
| { |
| ssize_t res = 0; |
| |
| if (!c->service->needs_sock_for_poll) { |
| return res; |
| } |
| |
| assert(c->outstanding_notifiers >= 0); |
| if (c->outstanding_notifiers > 0) { |
| c->outstanding_notifiers++; |
| res = resend_event_notifications(c); |
| } else { |
| res = qb_ipc_us_send(&c->setup, &c->outstanding_notifiers, 1); |
| if (res == -EAGAIN) { |
| /* |
| * notify the client later, when we can. |
| */ |
| c->outstanding_notifiers++; |
| c->poll_events = POLLOUT | POLLIN | POLLPRI | POLLNVAL; |
| (void)_modify_dispatch_descriptor_(c); |
| } |
| } |
| return res; |
| } |
| |
| ssize_t |
| qb_ipcs_event_send(struct qb_ipcs_connection * c, const void *data, size_t size) |
| { |
| ssize_t res; |
| ssize_t resn; |
| |
| if (c == NULL) { |
| return -EINVAL; |
| } else if (size > c->event.max_msg_size) { |
| return -EMSGSIZE; |
| } |
| |
| qb_ipcs_connection_ref(c); |
| res = c->service->funcs.send(&c->event, data, size); |
| if (res == size) { |
| c->stats.events++; |
| resn = new_event_notification(c); |
| if (resn < 0 && resn != -EAGAIN && resn != -ENOBUFS) { |
| errno = -resn; |
| qb_util_perror(LOG_WARNING, |
| "new_event_notification (%s)", |
| c->description); |
| res = resn; |
| } |
| } else if (res == -EAGAIN || res == -ETIMEDOUT) { |
| struct qb_ipc_one_way *ow = _event_sock_one_way_get(c); |
| |
| if (c->outstanding_notifiers > 0) { |
| resn = resend_event_notifications(c); |
| } |
| if (ow) { |
| resn = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT); |
| if (resn < 0) { |
| res = resn; |
| } |
| } |
| c->stats.send_retries++; |
| } |
| |
| qb_ipcs_connection_unref(c); |
| return res; |
| } |
| |
| ssize_t |
| qb_ipcs_event_sendv(struct qb_ipcs_connection * c, |
| const struct iovec * iov, size_t iov_len) |
| { |
| ssize_t res; |
| ssize_t resn; |
| |
| if (c == NULL) { |
| return -EINVAL; |
| } |
| qb_ipcs_connection_ref(c); |
| |
| res = c->service->funcs.sendv(&c->event, iov, iov_len); |
| if (res > 0) { |
| c->stats.events++; |
| resn = new_event_notification(c); |
| if (resn < 0 && resn != -EAGAIN) { |
| errno = -resn; |
| qb_util_perror(LOG_WARNING, |
| "new_event_notification (%s)", |
| c->description); |
| res = resn; |
| } |
| } else if (res == -EAGAIN || res == -ETIMEDOUT) { |
| struct qb_ipc_one_way *ow = _event_sock_one_way_get(c); |
| |
| if (c->outstanding_notifiers > 0) { |
| resn = resend_event_notifications(c); |
| } |
| if (ow) { |
| resn = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT); |
| if (resn < 0) { |
| res = resn; |
| } |
| } |
| c->stats.send_retries++; |
| } |
| |
| qb_ipcs_connection_unref(c); |
| return res; |
| } |
| |
| qb_ipcs_connection_t * |
| qb_ipcs_connection_first_get(struct qb_ipcs_service * s) |
| { |
| struct qb_ipcs_connection *c; |
| |
| if (qb_list_empty(&s->connections)) { |
| return NULL; |
| } |
| |
| c = qb_list_first_entry(&s->connections, struct qb_ipcs_connection, |
| list); |
| qb_ipcs_connection_ref(c); |
| |
| return c; |
| } |
| |
| qb_ipcs_connection_t * |
| qb_ipcs_connection_next_get(struct qb_ipcs_service * s, |
| struct qb_ipcs_connection * current) |
| { |
| struct qb_ipcs_connection *c; |
| |
| if (current == NULL || |
| qb_list_is_last(¤t->list, &s->connections)) { |
| return NULL; |
| } |
| |
| c = qb_list_first_entry(¤t->list, struct qb_ipcs_connection, |
| list); |
| qb_ipcs_connection_ref(c); |
| |
| return c; |
| } |
| |
| int32_t |
| qb_ipcs_service_id_get(struct qb_ipcs_connection * c) |
| { |
| if (c == NULL) { |
| return -EINVAL; |
| } |
| return c->service->service_id; |
| } |
| |
| struct qb_ipcs_connection * |
| qb_ipcs_connection_alloc(struct qb_ipcs_service *s) |
| { |
| struct qb_ipcs_connection *c = |
| calloc(1, sizeof(struct qb_ipcs_connection)); |
| |
| if (c == NULL) { |
| return NULL; |
| } |
| |
| c->pid = 0; |
| c->euid = -1; |
| c->egid = -1; |
| c->receive_buf = NULL; |
| c->context = NULL; |
| c->fc_enabled = QB_FALSE; |
| c->state = QB_IPCS_CONNECTION_INACTIVE; |
| c->poll_events = POLLIN | POLLPRI | POLLNVAL; |
| |
| c->setup.type = s->type; |
| c->request.type = s->type; |
| c->response.type = s->type; |
| c->event.type = s->type; |
| (void)strlcpy(c->description, "not set yet", CONNECTION_DESCRIPTION); |
| |
| /* initial alloc ref */ |
| qb_ipcs_connection_ref(c); |
| |
| /* |
| * The connection makes use of the service object. Give the connection |
| * a reference to the service so we know the service can never be destroyed |
| * until the connection is done with it. |
| */ |
| qb_ipcs_ref(s); |
| c->service = s; |
| qb_list_init(&c->list); |
| |
| return c; |
| } |
| |
| void |
| qb_ipcs_connection_ref(struct qb_ipcs_connection *c) |
| { |
| if (c) { |
| qb_atomic_int_inc(&c->refcount); |
| } |
| } |
| |
| void |
| qb_ipcs_connection_unref(struct qb_ipcs_connection *c) |
| { |
| int32_t free_it; |
| |
| if (c == NULL) { |
| return; |
| } |
| if (c->refcount < 1) { |
| qb_util_log(LOG_ERR, "ref:%d state:%d (%s)", |
| c->refcount, c->state, c->description); |
| assert(0); |
| } |
| free_it = qb_atomic_int_dec_and_test(&c->refcount); |
| if (free_it) { |
| qb_list_del(&c->list); |
| if (c->service->serv_fns.connection_destroyed) { |
| c->service->serv_fns.connection_destroyed(c); |
| } |
| c->service->funcs.disconnect(c); |
| /* Let go of the connection's reference to the service */ |
| qb_ipcs_unref(c->service); |
| free(c->receive_buf); |
| free(c); |
| } |
| } |
| |
| void |
| qb_ipcs_disconnect(struct qb_ipcs_connection *c) |
| { |
| int32_t res = 0; |
| qb_loop_job_dispatch_fn rerun_job; |
| |
| if (c == NULL) { |
| return; |
| } |
| qb_util_log(LOG_DEBUG, "%s(%s) state:%d", |
| __func__, c->description, c->state); |
| |
| if (c->state == QB_IPCS_CONNECTION_ACTIVE) { |
| c->service->funcs.disconnect(c); |
| c->state = QB_IPCS_CONNECTION_INACTIVE; |
| c->service->stats.closed_connections++; |
| |
| /* This removes the initial alloc ref */ |
| qb_ipcs_connection_unref(c); |
| |
| /* return early as it's an incomplete connection. |
| */ |
| return; |
| } |
| if (c->state == QB_IPCS_CONNECTION_ESTABLISHED) { |
| c->service->funcs.disconnect(c); |
| c->state = QB_IPCS_CONNECTION_SHUTTING_DOWN; |
| c->service->stats.active_connections--; |
| c->service->stats.closed_connections++; |
| } |
| if (c->state == QB_IPCS_CONNECTION_SHUTTING_DOWN) { |
| int scheduled_retry = 0; |
| res = 0; |
| if (c->service->serv_fns.connection_closed) { |
| res = c->service->serv_fns.connection_closed(c); |
| } |
| if (res != 0) { |
| /* OK, so they want the connection_closed |
| * function re-run */ |
| rerun_job = |
| (qb_loop_job_dispatch_fn) qb_ipcs_disconnect; |
| res = c->service->poll_fns.job_add(QB_LOOP_LOW, |
| c, rerun_job); |
| if (res == 0) { |
| /* this function is going to be called again. |
| * so hold off on the unref */ |
| scheduled_retry = 1; |
| } |
| } |
| remove_tempdir(c->description); |
| if (scheduled_retry == 0) { |
| /* This removes the initial alloc ref */ |
| qb_ipcs_connection_unref(c); |
| } |
| } |
| |
| } |
| |
| static void |
| qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c, int32_t fc_enable) |
| { |
| if (c == NULL) { |
| return; |
| } |
| if (c->fc_enabled != fc_enable) { |
| c->service->funcs.fc_set(&c->request, fc_enable); |
| c->fc_enabled = fc_enable; |
| c->stats.flow_control_state = fc_enable; |
| c->stats.flow_control_count++; |
| } |
| } |
| |
| static int32_t |
| _process_request_(struct qb_ipcs_connection *c, int32_t ms_timeout) |
| { |
| int32_t res = 0; |
| ssize_t size; |
| struct qb_ipc_request_header *hdr; |
| |
| if (c->service->funcs.peek && c->service->funcs.reclaim) { |
| size = c->service->funcs.peek(&c->request, (void **)&hdr, |
| ms_timeout); |
| } else { |
| hdr = c->receive_buf; |
| size = c->service->funcs.recv(&c->request, |
| hdr, |
| c->request.max_msg_size, |
| ms_timeout); |
| } |
| if (size < 0) { |
| if (size != -EAGAIN && size != -ETIMEDOUT) { |
| qb_util_perror(LOG_DEBUG, |
| "recv from client connection failed (%s)", |
| c->description); |
| } else { |
| c->stats.recv_retries++; |
| } |
| res = size; |
| goto cleanup; |
| } else if (size == 0 || hdr->id == QB_IPC_MSG_DISCONNECT) { |
| qb_util_log(LOG_DEBUG, "client requesting a disconnect (%s)", |
| c->description); |
| res = -ESHUTDOWN; |
| goto cleanup; |
| } else { |
| c->stats.requests++; |
| res = c->service->serv_fns.msg_process(c, hdr, hdr->size); |
| /* 0 == good, negative == backoff */ |
| if (res < 0) { |
| res = -ENOBUFS; |
| } else { |
| res = size; |
| } |
| } |
| |
| if (c && c->service->funcs.peek && c->service->funcs.reclaim) { |
| c->service->funcs.reclaim(&c->request); |
| } |
| |
| cleanup: |
| return res; |
| } |
| |
| #define IPC_REQUEST_TIMEOUT 10 |
| #define MAX_RECV_MSGS 50 |
| |
| static ssize_t |
| _request_q_len_get(struct qb_ipcs_connection *c) |
| { |
| ssize_t q_len; |
| if (c->service->funcs.q_len_get) { |
| q_len = c->service->funcs.q_len_get(&c->request); |
| if (q_len <= 0) { |
| return q_len; |
| } |
| if (c->service->poll_priority == QB_LOOP_MED) { |
| q_len = QB_MIN(q_len, 5); |
| } else if (c->service->poll_priority == QB_LOOP_LOW) { |
| q_len = 1; |
| } else { |
| q_len = QB_MIN(q_len, MAX_RECV_MSGS); |
| } |
| } else { |
| q_len = 1; |
| } |
| return q_len; |
| } |
| |
| int32_t |
| qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, void *data) |
| { |
| struct qb_ipcs_connection *c = (struct qb_ipcs_connection *)data; |
| char bytes[MAX_RECV_MSGS]; |
| int32_t res = 0; |
| int32_t res2; |
| int32_t recvd = 0; |
| ssize_t avail; |
| |
| if (revents & POLLNVAL) { |
| qb_util_log(LOG_DEBUG, "NVAL conn (%s)", c->description); |
| res = -EINVAL; |
| goto dispatch_cleanup; |
| } |
| if (revents & POLLHUP) { |
| qb_util_log(LOG_DEBUG, "HUP conn (%s)", c->description); |
| res = -ESHUTDOWN; |
| goto dispatch_cleanup; |
| } |
| |
| if (revents & POLLOUT) { |
| /* try resend events now that fd can write */ |
| res = resend_event_notifications(c); |
| if (res < 0 && res != -EAGAIN) { |
| errno = -res; |
| qb_util_perror(LOG_WARNING, |
| "resend_event_notifications (%s)", |
| c->description); |
| } |
| /* nothing to read */ |
| if ((revents & POLLIN) == 0) { |
| res = 0; |
| goto dispatch_cleanup; |
| } |
| } |
| if (c->fc_enabled) { |
| res = 0; |
| goto dispatch_cleanup; |
| } |
| avail = _request_q_len_get(c); |
| |
| if (c->service->needs_sock_for_poll && avail == 0) { |
| res2 = qb_ipc_us_recv(&c->setup, bytes, 1, 0); |
| if (qb_ipc_us_sock_error_is_disconnected(res2)) { |
| errno = -res2; |
| qb_util_perror(LOG_WARNING, "conn (%s) disconnected", |
| c->description); |
| res = -ESHUTDOWN; |
| goto dispatch_cleanup; |
| } else { |
| qb_util_log(LOG_WARNING, |
| "conn (%s) Nothing in q but got POLLIN on fd:%d (res2:%d)", |
| c->description, fd, res2); |
| res = 0; |
| goto dispatch_cleanup; |
| } |
| } |
| |
| do { |
| res = _process_request_(c, IPC_REQUEST_TIMEOUT); |
| |
| if (res == -ESHUTDOWN) { |
| goto dispatch_cleanup; |
| } |
| |
| if (res > 0 || res == -ENOBUFS || res == -EINVAL) { |
| recvd++; |
| } |
| if (res > 0) { |
| avail--; |
| } |
| } while (avail > 0 && res > 0 && !c->fc_enabled); |
| |
| if (c->service->needs_sock_for_poll && recvd > 0) { |
| res2 = qb_ipc_us_recv(&c->setup, bytes, recvd, -1); |
| if (qb_ipc_us_sock_error_is_disconnected(res2)) { |
| errno = -res2; |
| qb_util_perror(LOG_ERR, "error receiving from setup sock (%s)", c->description); |
| |
| res = -ESHUTDOWN; |
| goto dispatch_cleanup; |
| } |
| } |
| |
| res = QB_MIN(0, res); |
| if (res == -EAGAIN || res == -ETIMEDOUT || res == -ENOBUFS) { |
| res = 0; |
| } |
| if (res != 0) { |
| if (res != -ENOTCONN) { |
| /* |
| * Abnormal state (ENOTCONN is normal shutdown). |
| */ |
| errno = -res; |
| qb_util_perror(LOG_ERR, "request returned error (%s)", |
| c->description); |
| } |
| } |
| |
| dispatch_cleanup: |
| if (res != 0) { |
| qb_ipcs_disconnect(c); |
| } |
| return res; |
| } |
| |
| void |
| qb_ipcs_context_set(struct qb_ipcs_connection *c, void *context) |
| { |
| if (c == NULL) { |
| return; |
| } |
| c->context = context; |
| } |
| |
| void * |
| qb_ipcs_context_get(struct qb_ipcs_connection *c) |
| { |
| if (c == NULL) { |
| return NULL; |
| } |
| return c->context; |
| } |
| |
| void * |
| qb_ipcs_connection_service_context_get(qb_ipcs_connection_t *c) |
| { |
| if (c == NULL || c->service == NULL) { |
| return NULL; |
| } |
| return c->service->context; |
| } |
| |
| int32_t |
| qb_ipcs_connection_stats_get(qb_ipcs_connection_t * c, |
| struct qb_ipcs_connection_stats * stats, |
| int32_t clear_after_read) |
| { |
| if (c == NULL) { |
| return -EINVAL; |
| } |
| memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats)); |
| if (clear_after_read) { |
| memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats_2)); |
| c->stats.client_pid = c->pid; |
| } |
| return 0; |
| } |
| |
| struct qb_ipcs_connection_stats_2* |
| qb_ipcs_connection_stats_get_2(qb_ipcs_connection_t *c, |
| int32_t clear_after_read) |
| { |
| struct qb_ipcs_connection_stats_2 * stats; |
| |
| if (c == NULL) { |
| errno = EINVAL; |
| return NULL; |
| } |
| stats = calloc(1, sizeof(struct qb_ipcs_connection_stats_2)); |
| if (stats == NULL) { |
| return NULL; |
| } |
| |
| memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats_2)); |
| |
| if (c->service->funcs.q_len_get) { |
| stats->event_q_length = c->service->funcs.q_len_get(&c->event); |
| } else { |
| stats->event_q_length = 0; |
| } |
| if (clear_after_read) { |
| memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats_2)); |
| c->stats.client_pid = c->pid; |
| } |
| return stats; |
| } |
| |
| int32_t |
| qb_ipcs_stats_get(struct qb_ipcs_service * s, |
| struct qb_ipcs_stats * stats, int32_t clear_after_read) |
| { |
| if (s == NULL) { |
| return -EINVAL; |
| } |
| memcpy(stats, &s->stats, sizeof(struct qb_ipcs_stats)); |
| if (clear_after_read) { |
| memset(&s->stats, 0, sizeof(struct qb_ipcs_stats)); |
| } |
| return 0; |
| } |
| |
| void |
| qb_ipcs_connection_auth_set(qb_ipcs_connection_t *c, uid_t uid, |
| gid_t gid, mode_t mode) |
| { |
| if (c) { |
| c->auth.uid = uid; |
| c->auth.gid = gid; |
| c->auth.mode = mode; |
| } |
| } |
| |
| int32_t |
| qb_ipcs_connection_get_buffer_size(qb_ipcs_connection_t *c) |
| { |
| if (c == NULL) { |
| return -EINVAL; |
| } |
| |
| /* request, response, and event shoud all have the same |
| * buffer size allocated. It doesn't matter which we return |
| * here. */ |
| return c->response.max_msg_size; |
| } |
| |
| void qb_ipcs_enforce_buffer_size(qb_ipcs_service_t *s, uint32_t buf_size) |
| { |
| if (s == NULL) { |
| return; |
| } |
| s->max_buffer_size = buf_size; |
| } |