blob: dbde2757a071a263ecb017186c849995ee5eb58e [file] [log] [blame]
/*
* Copyright (C) 2010 Red Hat, Inc.
*
* Author: Angus Salkeld <asalkeld@redhat.com>
*
* This file is part of libqb.
*
* libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
#include "ringbuffer_int.h"
#include <qb/qbdefs.h>
static int32_t
my_posix_sem_timedwait(void * instance, int32_t ms_timeout)
{
struct qb_ringbuffer_s *rb = (struct qb_ringbuffer_s *)instance;
struct timespec ts_timeout;
int32_t res;
if (ms_timeout > 0) {
qb_util_timespec_from_epoch_get(&ts_timeout);
qb_timespec_add_ms(&ts_timeout, ms_timeout);
}
sem_wait_again:
if (ms_timeout > 0) {
res = rpl_sem_timedwait(&rb->shared_hdr->posix_sem, &ts_timeout);
} else if (ms_timeout == 0) {
res = rpl_sem_trywait(&rb->shared_hdr->posix_sem);
} else {
res = rpl_sem_wait(&rb->shared_hdr->posix_sem);
}
if (res == -1) {
switch (errno) {
case EINTR:
goto sem_wait_again;
break;
case EAGAIN:
res = -ETIMEDOUT;
break;
case ETIMEDOUT:
res = -errno;
break;
default:
res = -errno;
qb_util_perror(LOG_ERR, "error waiting for semaphore");
break;
}
}
return res;
}
static int32_t
my_posix_sem_post(void * instance, size_t msg_size)
{
struct qb_ringbuffer_s *rb = (struct qb_ringbuffer_s *)instance;
if (rpl_sem_post(&rb->shared_hdr->posix_sem) < 0) {
return -errno;
} else {
return 0;
}
}
static ssize_t
my_posix_getvalue_fn(void * instance)
{
struct qb_ringbuffer_s *rb = (struct qb_ringbuffer_s *)instance;
int val;
if (rpl_sem_getvalue(&rb->shared_hdr->posix_sem, &val) < 0) {
return -errno;
} else {
return val;
}
}
static int32_t
my_posix_sem_destroy(void * instance)
{
struct qb_ringbuffer_s *rb = (struct qb_ringbuffer_s *)instance;
qb_enter();
if (rpl_sem_destroy(&rb->shared_hdr->posix_sem) == -1) {
return -errno;
} else {
return 0;
}
}
static int32_t
my_posix_sem_create(void * instance, uint32_t flags)
{
struct qb_ringbuffer_s *rb = (struct qb_ringbuffer_s *)instance;
int32_t pshared = QB_FALSE;
if (flags & QB_RB_FLAG_SHARED_PROCESS) {
if ((flags & QB_RB_FLAG_CREATE) == 0) {
return 0;
}
pshared = QB_TRUE;
}
if (rpl_sem_init(&rb->shared_hdr->posix_sem, pshared, 0) == -1) {
return -errno;
} else {
return 0;
}
}
static int32_t
my_sysv_sem_timedwait(void * instance, int32_t ms_timeout)
{
struct qb_ringbuffer_s *rb = (struct qb_ringbuffer_s *)instance;
struct sembuf sops[1];
int32_t res = 0;
#ifdef HAVE_SEMTIMEDOP
struct timespec ts_timeout;
struct timespec *ts_pt;
if (ms_timeout >= 0) {
/*
* Note: sem_timedwait takes an absolute time where as semtimedop
* takes a relative time.
*/
ts_timeout.tv_sec = 0;
ts_timeout.tv_nsec = 0;
qb_timespec_add_ms(&ts_timeout, ms_timeout);
ts_pt = &ts_timeout;
} else {
ts_pt = NULL;
}
#endif /* HAVE_SEMTIMEDOP */
/*
* wait for sem post.
*/
sops[0].sem_num = 0;
sops[0].sem_op = -1;
#ifdef HAVE_SEMTIMEDOP
sops[0].sem_flg = 0;
#else
sops[0].sem_flg = IPC_NOWAIT;
#endif /* HAVE_SEMTIMEDOP */
semop_again:
#ifdef HAVE_SEMTIMEDOP
if (semtimedop(rb->sem_id, sops, 1, ts_pt) == -1)
#else
if (semop(rb->sem_id, sops, 1) == -1)
#endif /* HAVE_SEMTIMEDOP */
{
if (errno == EINTR) {
goto semop_again;
} else if (errno == EAGAIN) {
/* make consistent with sem_timedwait */
res = -ETIMEDOUT;
} else {
res = -errno;
qb_util_perror(LOG_ERR, "error waiting for semaphore");
}
return res;
}
return 0;
}
static int32_t
my_sysv_sem_post(void * instance, size_t msg_size)
{
struct qb_ringbuffer_s *rb = (struct qb_ringbuffer_s *)instance;
struct sembuf sops[1];
if ((rb->flags & QB_RB_FLAG_SHARED_PROCESS) == 0) {
return 0;
}
sops[0].sem_num = 0;
sops[0].sem_op = 1;
sops[0].sem_flg = 0;
semop_again:
if (semop(rb->sem_id, sops, 1) == -1) {
if (errno == EINTR) {
goto semop_again;
} else {
qb_util_perror(LOG_ERR,
"could not increment semaphore");
}
return -errno;
}
return 0;
}
static ssize_t
my_sysv_getvalue_fn(void * instance)
{
struct qb_ringbuffer_s *rb = (struct qb_ringbuffer_s *)instance;
ssize_t res = semctl(rb->sem_id, 0, GETVAL, 0);
if (res == -1) {
return -errno;
}
return res;
}
static int32_t
my_sysv_sem_destroy(void * instance)
{
struct qb_ringbuffer_s *rb = (struct qb_ringbuffer_s *)instance;
if (semctl(rb->sem_id, 0, IPC_RMID, 0) == -1) {
return -errno;
} else {
return 0;
}
}
static int32_t
my_sysv_sem_create(void * instance, uint32_t flags)
{
struct qb_ringbuffer_s *rb = (struct qb_ringbuffer_s *)instance;
union semun options;
int32_t res;
key_t sem_key;
sem_key = ftok(rb->shared_hdr->hdr_path, (rb->shared_hdr->word_size + 1));
if (sem_key == -1) {
res = -errno;
qb_util_perror(LOG_ERR, "couldn't get a sem id");
return res;
}
if (flags & QB_RB_FLAG_CREATE) {
rb->sem_id = semget(sem_key, 1, IPC_CREAT | IPC_EXCL | 0600);
if (rb->sem_id == -1) {
res = -errno;
qb_util_perror(LOG_ERR, "couldn't create a semaphore");
return res;
}
options.val = 0;
res = semctl(rb->sem_id, 0, SETVAL, options);
} else {
rb->sem_id = semget(sem_key, 0, 0600);
if (rb->sem_id == -1) {
res = -errno;
qb_util_perror(LOG_ERR, "couldn't get a sem id");
return res;
}
res = 0;
}
qb_util_log(LOG_DEBUG, "sem key:%d, id:%d, value:%d",
(int)sem_key, rb->sem_id, semctl(rb->sem_id, 0, GETVAL, 0));
return res;
}
int32_t
qb_rb_sem_create(struct qb_ringbuffer_s * rb, uint32_t flags)
{
int32_t rc;
int32_t use_posix = QB_TRUE;
if ((flags & QB_RB_FLAG_SHARED_PROCESS) &&
!(flags & QB_RB_FLAG_NO_SEMAPHORE)) {
#if defined(HAVE_POSIX_PSHARED_SEMAPHORE) || \
defined(HAVE_RPL_PSHARED_SEMAPHORE)
use_posix = QB_TRUE;
#else
#ifdef HAVE_SYSV_PSHARED_SEMAPHORE
use_posix = QB_FALSE;
#else
return -ENOTSUP;
#endif /* HAVE_SYSV_PSHARED_SEMAPHORE */
#endif /* HAVE_POSIX_PSHARED_SEMAPHORE */
}
if (flags & QB_RB_FLAG_NO_SEMAPHORE) {
rc = 0;
rb->notifier.instance = NULL;
rb->notifier.timedwait_fn = NULL;
rb->notifier.post_fn = NULL;
rb->notifier.q_len_fn = NULL;
rb->notifier.space_used_fn = NULL;
rb->notifier.destroy_fn = NULL;
} else if (use_posix) {
rc = my_posix_sem_create(rb, flags);
rb->notifier.instance = rb;
rb->notifier.timedwait_fn = my_posix_sem_timedwait;
rb->notifier.post_fn = my_posix_sem_post;
rb->notifier.q_len_fn = my_posix_getvalue_fn;
rb->notifier.space_used_fn = NULL;
rb->notifier.destroy_fn = my_posix_sem_destroy;
} else {
rc = my_sysv_sem_create(rb, flags);
rb->notifier.instance = rb;
rb->notifier.timedwait_fn = my_sysv_sem_timedwait;
rb->notifier.post_fn = my_sysv_sem_post;
rb->notifier.q_len_fn = my_sysv_getvalue_fn;
rb->notifier.space_used_fn = NULL;
rb->notifier.destroy_fn = my_sysv_sem_destroy;
}
return rc;
}
/* For qb_rb_close_helper, we need to open directory in read-only
mode and with as lightweight + strict flags as available at
given platform (O_PATH for the former, O_DIRECTORY for the
latter); end result is available as RB_DIR_RO_FLAGS.
*/
#if defined(HAVE_OPENAT) && defined(HAVE_UNLINKAT)
# ifndef O_DIRECTORY
# define RB_DIR_RO_FLAGS1 O_RDONLY
# else
# define RB_DIR_RO_FLAGS1 O_RDONLY|O_DIRECTORY
# endif
# ifndef O_PATH
# define RB_DIR_RO_FLAGS RB_DIR_RO_FLAGS1
# else
# define RB_DIR_RO_FLAGS RB_DIR_RO_FLAGS1|O_PATH
# endif
int32_t
qb_rb_close_helper(struct qb_ringbuffer_s * rb, int32_t unlink_it,
int32_t truncate_fallback)
{
int32_t res = 0, res2 = 0;
uint32_t word_size = rb->shared_hdr->word_size;
char *hdr_path = rb->shared_hdr->hdr_path;
if (unlink_it) {
qb_util_log(LOG_DEBUG, "Free'ing ringbuffer: %s", hdr_path);
if (rb->notifier.destroy_fn) {
(void)rb->notifier.destroy_fn(rb->notifier.instance);
}
} else {
qb_util_log(LOG_DEBUG, "Closing ringbuffer: %s", hdr_path);
hdr_path = NULL;
}
if (unlink_it) {
char *data_path = rb->shared_hdr->data_path;
char *sep = strrchr(data_path, '/');
/* we could modify data_path in-situ, but that would segfault if
we hadn't write permissions to the underlying mmap'd file */
char dir_path[PATH_MAX];
int dirfd;
if (sep != NULL) {
strncpy(dir_path, data_path, sep - data_path);
dir_path[sep - data_path] = '\0';
if ((dirfd = open(dir_path, RB_DIR_RO_FLAGS)) != -1) {
res = qb_sys_unlink_or_truncate_at(dirfd, sep + 1,
truncate_fallback);
/* the dirname part is assumed to be the same */
assert(!strncmp(dir_path, hdr_path, sep - data_path));
sep = hdr_path + (sep - data_path);
/* now, don't touch neither data_path nor hdr_path */
res2 = qb_sys_unlink_or_truncate_at(dirfd, sep + 1,
truncate_fallback);
close(dirfd);
} else {
res = -errno;
qb_util_perror(LOG_DEBUG,
"Cannot open dir: %s", hdr_path);
}
} else {
res = -EINVAL;
qb_util_perror(LOG_DEBUG,
"Not dir-separable path: %s", hdr_path);
}
#else
res = qb_sys_unlink_or_truncate(data_path, truncate_fallback);
res2 = qb_sys_unlink_or_truncate(hdr_path, truncate_fallback);
#endif /* defined(HAVE_OPENAT) && defined(HAVE_UNLINKAT) */
res = res ? res : res2;
hdr_path = NULL;
} /* if (unlink_it) */
if (munmap(rb->shared_data, (word_size * sizeof(uint32_t)) << 1) == -1) {
res = res ? res : -errno;
qb_util_perror(LOG_DEBUG, "Cannot munmap shared_data");
}
if (munmap(rb->shared_hdr, sizeof(struct qb_ringbuffer_shared_s)) == -1) {
res = res ? res : -errno;
qb_util_perror(LOG_DEBUG, "Cannot munmap shared_hdr");
}
free(rb);
return res;
}