blob: 62433e64f789ad3ff7292e45add7bb49948aa5c4 [file] [log] [blame]
/****************************************************************************
**
** Copyright (C) 2017-2016 Ford Motor Company
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtRemoteObjects module of the Qt Toolkit.
**
** $QT_BEGIN_LICENSE:LGPL$
** Commercial License Usage
** Licensees holding valid commercial Qt licenses may use this file in
** accordance with the commercial license agreement provided with the
** Software or, alternatively, in accordance with the terms contained in
** a written agreement between you and The Qt Company. For licensing terms
** and conditions see https://www.qt.io/terms-conditions. For further
** information use the contact form at https://www.qt.io/contact-us.
**
** GNU Lesser General Public License Usage
** Alternatively, this file may be used under the terms of the GNU Lesser
** General Public License version 3 as published by the Free Software
** Foundation and appearing in the file LICENSE.LGPL3 included in the
** packaging of this file. Please review the following information to
** ensure the GNU Lesser General Public License version 3 requirements
** will be met: https://www.gnu.org/licenses/lgpl-3.0.html.
**
** GNU General Public License Usage
** Alternatively, this file may be used under the terms of the GNU
** General Public License version 2.0 or (at your option) the GNU General
** Public license version 3 or any later version approved by the KDE Free
** Qt Foundation. The licenses are as published by the Free Software
** Foundation and appearing in the file LICENSE.GPL2 and LICENSE.GPL3
** included in the packaging of this file. Please review the following
** information to ensure the GNU General Public License requirements will
** be met: https://www.gnu.org/licenses/gpl-2.0.html and
** https://www.gnu.org/licenses/gpl-3.0.html.
**
** $QT_END_LICENSE$
**
****************************************************************************/
#include "qconnection_qnx_global_p.h"
#include "qconnection_qnx_qiodevices_p.h"
#include "qconnection_qnx_server_p.h"
QT_BEGIN_NAMESPACE
QQnxNativeServer::QQnxNativeServer(QObject *parent)
: QObject(*new QQnxNativeServerPrivate, parent)
{
}
QQnxNativeServer::~QQnxNativeServer()
{
}
void QQnxNativeServer::close()
{
Q_D(QQnxNativeServer);
d->teardownServer();
}
bool QQnxNativeServer::hasPendingConnections() const
{
Q_D(const QQnxNativeServer);
d->mutex.lock();
const int len = d->pending.length();
d->mutex.unlock();
return len > 0;
}
bool QQnxNativeServer::isListening() const
{
Q_D(const QQnxNativeServer);
return !(d->serverName.isEmpty());
}
bool QQnxNativeServer::listen(const QString &name)
{
Q_D(QQnxNativeServer);
if (isListening()) {
qCWarning(QT_REMOTEOBJECT) << "QQnxNativeServer::listen() called when already listening";
return false;
}
if (name.isEmpty()) {
d->error = QAbstractSocket::HostNotFoundError;
QString function = QLatin1String("QQnxNativeServer::listen");
d->errorString = tr("%1: Name error").arg(function);
return false;
}
if (!d->listen(name)) {
d->serverName.clear();
return false;
}
d->serverName = name;
return true;
}
QSharedPointer<QIOQnxSource> QQnxNativeServer::nextPendingConnection()
{
Q_D(QQnxNativeServer);
d->mutex.lock();
Q_ASSERT(d->pending.length() > 0);
auto io = d->pending.takeFirst();
d->mutex.unlock();
return io;
}
QString QQnxNativeServer::serverName() const
{
Q_D(const QQnxNativeServer);
return d->serverName;
}
bool QQnxNativeServer::waitForNewConnection(int msec, bool *timedOut)
{
//TODO - This method isn't used by Qt Remote Objects, but would
//need to be implemented before this class could be used as a
//connection server (like QTcpServer or QLocalServer).
Q_UNUSED(msec);
Q_UNUSED(timedOut);
Q_ASSERT(false);
return false;
}
void QQnxNativeServer::onSourceClosed()
{
Q_D(QQnxNativeServer);
QIOQnxSource *conn = qobject_cast<QIOQnxSource *>(sender());
Q_ASSERT(conn);
d->cleanupIOSource(conn);
}
QQnxNativeServerPrivate::QQnxNativeServerPrivate()
: error(QAbstractSocket::UnknownSocketError)
, thread(this, QStringLiteral("NativeServer"))
{
}
QQnxNativeServerPrivate::~QQnxNativeServerPrivate()
{
if (thread.isRunning())
teardownServer();
}
// method (run in a thread) to watch for connections and handle receiving data
void QQnxNativeServerPrivate::thread_func()
{
struct _msg_info msg_info;
recv_msgs recv_buf;
QVector<iov_t> resp_repeat_iov(5);
qCDebug(QT_REMOTEOBJECT) << "Server thread_func running";
while (running.load()) {
// wait for messages and pulses
int rcvid = MsgReceive(attachStruct->chid, &recv_buf, sizeof(_pulse), &msg_info);
qCDebug(QT_REMOTEOBJECT) << "MsgReceive unblocked. Rcvid" << rcvid << " Scoid" << msg_info.scoid;
if (rcvid == -1) {
if (errno != ESRCH) // ESRCH means the channel closed
WARNING(MsgReceive)
break;
}
if (0 == rcvid) {
qCDebug(QT_REMOTEOBJECT) << "Pulse" << recv_buf.pulse.code;
/* we received a pulse
*/
switch (recv_buf.pulse.code) {
case _PULSE_CODE_DISCONNECT:
{
/* a client has disconnected. Verify that it is
* our client, and if so, clean up our saved state
*/
const int scoid = recv_buf.pulse.scoid;
const QSet<int> coids = connections.take(scoid);
for (int coid : coids)
{
const uint64_t uid = static_cast<uint64_t>(scoid) << 32 | static_cast<uint32_t>(coid);
QSharedPointer<QIOQnxSource> io;
mutex.lock();
if (sources.contains(uid))
io = sources.take(uid);
mutex.unlock();
#ifdef USE_HAM
ham_action_t *action = hamActions.take(uid);
ham_action_remove(action, 0);
ham_action_handle_free(action);
#endif
if (!io.isNull()) {
io->d_func()->m_serverClosing.ref();
QMetaObject::invokeMethod(io.data(),"onDisconnected",Qt::QueuedConnection);
}
}
/* always do the ConnectDetach() */
//Don't care about return value
ConnectDetach(recv_buf.pulse.scoid);
qCDebug(QT_REMOTEOBJECT) << "disconnect from client" << recv_buf.pulse.scoid;
}
break;
case _PULSE_CODE_COIDDEATH:
{
/* a connection has gone away. Verify that it is
* our client, and if so, clean up our saved state
*/
const int coid = recv_buf.pulse.value.sival_int;
if (ConnectServerInfo(0, coid, nullptr) != coid) {
const int scoid = recv_buf.pulse.scoid;
if (connections.value(scoid).contains(coid))
connections[scoid].remove(coid);
QSharedPointer<QIOQnxSource> io;
const uint64_t uid = static_cast<uint64_t>(scoid) << 32 | static_cast<uint32_t>(coid);
mutex.lock();
if (sources.contains(uid))
io = sources.take(uid);
mutex.unlock();
#ifdef USE_HAM
ham_action_t *action = hamActions.take(uid);
ham_action_remove(action, 0);
ham_action_handle_free(action);
#endif
if (!io.isNull()) {
io->d_func()->m_serverClosing.ref();
QMetaObject::invokeMethod(io.data(),"onDisconnected",Qt::QueuedConnection);
}
qCDebug(QT_REMOTEOBJECT) << "Connection dropped" << coid;
}
break;
}
break;
case _PULSE_CODE_UNBLOCK:
if (running.load()) {
// did we forget to Reply to our client?
qCWarning(QT_REMOTEOBJECT) << "got an unblock pulse, did you forget to reply to your client?";
WARN_ON_ERROR(MsgError, recv_buf.pulse.value.sival_int, EINTR)
}
break;
default:
qCWarning(QT_REMOTEOBJECT) << "unexpected pulse code: " << recv_buf.pulse.code << __FILE__ << __LINE__;
break;
}
continue;
}
/* not an error, not a pulse, therefore a message */
switch (recv_buf.type)
{
qCDebug(QT_REMOTEOBJECT) << "Msg Received" << recv_buf.type;
case _IO_CONNECT:
/* _IO_CONNECT because someone did a name_open() to us and we are
* in the network case (gns running). We must EOK it. */
if (connections.contains(msg_info.scoid)
&& connections.value(msg_info.scoid).contains(msg_info.coid))
{
qCWarning(QT_REMOTEOBJECT) << "Already connected rcvid seen" << connections << rcvid;
FATAL_ON_ERROR(MsgError, rcvid, EADDRINUSE)
} else {
if (!connections.contains(msg_info.scoid))
connections.insert(msg_info.scoid, QSet<int>());
connections[msg_info.scoid] << msg_info.coid;
qCDebug(QT_REMOTEOBJECT) << "New connection (qns)" << msg_info.coid << msg_info.scoid;
const uint64_t uid = static_cast<uint64_t>(msg_info.scoid) << 32 | static_cast<uint32_t>(msg_info.coid);
createSource(rcvid, uid, msg_info.pid); // Reads more and then calls MsgReply
}
break;
case MsgType::REPLICA_INIT:
qCDebug(QT_REMOTEOBJECT) << "MsgType::INIT received" << rcvid << msg_info.scoid << msg_info.tid << msg_info.chid << msg_info.coid;
//Check if this is a new connection (not gns)
if (connections.contains(msg_info.scoid)
&& connections.value(msg_info.scoid).contains(msg_info.coid))
{
qCWarning(QT_REMOTEOBJECT) << "Already connected rcvid seen" << connections << rcvid;
FATAL_ON_ERROR(MsgError, rcvid, EADDRINUSE)
} else {
if (!connections.contains(msg_info.scoid))
connections.insert(msg_info.scoid, QSet<int>());
connections[msg_info.scoid] << msg_info.coid;
qCDebug(QT_REMOTEOBJECT) << "New connection (non-gns)" << rcvid << msg_info.coid << msg_info.scoid;
const uint64_t uid = static_cast<uint64_t>(msg_info.scoid) << 32 | static_cast<uint32_t>(msg_info.coid);
createSource(rcvid, uid, msg_info.pid); // Reads more and then calls MsgReply
}
break;
case MsgType::SOURCE_TX_RESP:
{
WARN_ON_ERROR(MsgInfo, rcvid, &msg_info);
const uint64_t uid = static_cast<uint64_t>(msg_info.scoid) << 32 | static_cast<uint32_t>(msg_info.coid);
mutex.lock();
Q_ASSERT(sources.contains(uid));
auto io = sources.value(uid);
mutex.unlock();
io->d_func()->obLock.lockForWrite(); //NAR (Not-An-Error)
const QByteArray data = io->d_func()->obuffer.read();
const int bytesLeft = io->d_func()->obuffer.size();
io->d_func()->obLock.unlock();
qCDebug(QT_REMOTEOBJECT) << "server received SOURCE_TX_RESP with length" << msg_info.dstmsglen << "/" << data.length() << "Available:" << bytesLeft;
Q_ASSERT(data.length() == static_cast<int>(msg_info.dstmsglen - sizeof(bytesLeft)));
iov_t reply_vector[2];
SETIOV(reply_vector, &bytesLeft, sizeof(bytesLeft));
SETIOV(reply_vector+1, data.data(), data.length());
FATAL_ON_ERROR(MsgReplyv, rcvid, EOK, reply_vector, 2)
}
break;
case MsgType::SOURCE_TX_RESP_REPEAT:
{
WARN_ON_ERROR(MsgInfo, rcvid, &msg_info);
const uint64_t uid = static_cast<uint64_t>(msg_info.scoid) << 32 | static_cast<uint32_t>(msg_info.coid);
mutex.lock();
Q_ASSERT(sources.contains(uid));
auto io = sources.value(uid);
mutex.unlock();
int len_taken = 0;
const int to_send = msg_info.dstmsglen - sizeof(int); //Exclude the buffer count
QVector<QByteArray> qba_array;
io->d_func()->obLock.lockForWrite(); //NAR (Not-An-Error)
qCDebug(QT_REMOTEOBJECT) << "server received SOURCE_TX_RESP_REPEAT with length" << msg_info.dstmsglen << "Available:" << io->d_func()->obuffer.size();
while (len_taken != to_send)
{
QByteArray data = io->d_func()->obuffer.read();
qba_array << data;
len_taken += data.length();
if (data.length() == 0) { // We somehow reached the end
qCWarning(QT_REMOTEOBJECT) << "Reached the end of the buffer before getting the requested amount of data." << Q_FUNC_INFO << __FILE__ << __LINE__;
break;
}
}
qCDebug(QT_REMOTEOBJECT) << "grabbing more data" << len_taken << to_send << qba_array.length();
io->d_func()->obLock.unlock();
Q_ASSERT(len_taken == to_send);
const int buffers_taken = qba_array.length();
resp_repeat_iov.resize(buffers_taken+1);
SETIOV(&resp_repeat_iov[0], &buffers_taken, sizeof(buffers_taken));
for (int i = 1; i <= buffers_taken; ++i)
SETIOV(&resp_repeat_iov[i], qba_array.at(i-1).constData(), qba_array.at(i-1).length());
FATAL_ON_ERROR(MsgReplyv, rcvid, EOK, resp_repeat_iov.data(), buffers_taken+1)
}
break;
case MsgType::REPLICA_TX_RECV:
{
FATAL_ON_ERROR(MsgInfo,rcvid, &msg_info)
const uint64_t uid = static_cast<uint64_t>(msg_info.scoid) << 32 | static_cast<uint32_t>(msg_info.coid);
mutex.lock();
Q_ASSERT(sources.contains(uid));
auto io = sources.value(uid);
mutex.unlock();
//Long-lock, use buffer+memcpy if we run into trouble
io->d_func()->ibLock.lockForWrite();
qint32 toRead = msg_info.msglen-sizeof(MsgType);
char *payload = io->d_func()->buffer.reserve(toRead);
const int res = MsgRead(rcvid, payload, toRead, sizeof(MsgType));
if (-1 == res) {
io->d_func()->buffer.chop(toRead);
io->d_func()->ibLock.unlock();
qFatal("MsgRead");
}
Q_ASSERT(res == toRead);
if (res < toRead)
io->d_func()->buffer.chop(toRead - res);
io->d_func()->ibLock.unlock();
FATAL_ON_ERROR(MsgReply, rcvid, EOK, nullptr, 0)
qCDebug(QT_REMOTEOBJECT) << "server received REPLICA_TX_RECV" << payload << toRead;
emit io->readyRead();
}
break;
default:
/* some other unexpected message */
qCWarning(QT_REMOTEOBJECT) << "unexpected message type" << recv_buf.type << __FILE__ << __LINE__;
WARN_ON_ERROR(MsgError, rcvid, ENOSYS)
break;
}
}
mutex.lock();
for (auto io: sources)
io->d_func()->m_serverClosing.ref();
mutex.unlock();
name_detach(attachStruct, 0);
attachStruct = nullptr;
qCDebug(QT_REMOTEOBJECT) << "Server thread_func stopped";
}
bool QQnxNativeServerPrivate::listen(const QString &name)
{
attachStruct = name_attach(nullptr, qPrintable(name), 0);
if (attachStruct == nullptr) {
qCDebug(QT_REMOTEOBJECT, "name_attach call failed");
return false;
}
terminateCoid = ConnectAttach(ND_LOCAL_NODE, 0, attachStruct->chid, _NTO_SIDE_CHANNEL, 0);
if (terminateCoid == -1) {
qCDebug(QT_REMOTEOBJECT, "ConnectAttach failed");
return false;
}
running.ref();
thread.start();
return true;
}
void QQnxNativeServerPrivate::cleanupIOSource(QIOQnxSource *conn)
{
QSharedPointer<QIOQnxSource> io;
mutex.lock();
for (auto i = sources.begin(), end = sources.end(); i != end; ++i) {
if (i.value().data() == conn) {
io = std::move(i.value());
sources.erase(i);
break;
}
}
mutex.unlock();
if (!io.isNull()) {
io->d_func()->m_serverClosing.ref();
io->close();
}
}
void QQnxNativeServerPrivate::teardownServer()
{
if (attachStruct == nullptr)
return;
running.deref();
MsgSendPulse(terminateCoid, SIGEV_PULSE_PRIO_INHERIT, _PULSE_CODE_UNBLOCK, 0);
ConnectDetach(terminateCoid);
thread.wait();
//Existing QIOQnxSources will be deleted along with object
//threads gone, don't need to use mutex
sources.clear();
#ifdef USE_HAM
if (hamAvailable)
closeHamResources();
#endif
}
void QQnxNativeServerPrivate::createSource(int rcvid, uint64_t uid, pid_t toPid)
{
Q_Q(QQnxNativeServer);
#ifndef USE_HAM
Q_UNUSED(toPid);
#endif
auto io = QSharedPointer<QIOQnxSource>(new QIOQnxSource(rcvid));
io->moveToThread(q->thread());
QObject::connect(io.data(), &QIOQnxSource::aboutToClose,
q, &QQnxNativeServer::onSourceClosed);
QIOQnxSourcePrivate *iop = io->d_func();
FATAL_ON_ERROR(MsgRead, rcvid, &(iop->m_event), sizeof(sigevent), sizeof(MsgType))
int sentChannelId;
FATAL_ON_ERROR(MsgRead, rcvid, &sentChannelId, sizeof(int), sizeof(MsgType)+sizeof(sigevent))
FATAL_ON_ERROR(MsgReply, rcvid, EOK, nullptr, 0)
mutex.lock();
sources.insert(uid, io);
pending.append(io);
mutex.unlock();
//push an event into the main threads eventloop to emit newConnection.
QMetaObject::invokeMethod(q,"newConnection",Qt::QueuedConnection);
#ifdef USE_HAM
if (!hamInitialized) {
hamInitialized = true;
hamAvailable = initializeHam();
}
if (hamAvailable)
configureHamDeath(sentChannelId, toPid, uid);
#endif
}
#ifdef USE_HAM
bool QQnxNativeServerPrivate::initializeHam()
{
if (access("/proc/ham",F_OK) != 0) {
WARNING(access(/proc/ham))
return false;
}
ham_connect(0);
pid_t pid = getpid();
hamEntityHandle = ham_attach(qPrintable(serverName), ND_LOCAL_NODE, pid, NULL, 0);
if (!hamEntityHandle) {
WARNING(ham_attach)
ham_disconnect(0);
return false;
}
hamConditionHandle = ham_condition(hamEntityHandle, CONDDEATH, "death", 0);
if (!hamConditionHandle) {
WARNING(ham_condition)
ham_detach(hamEntityHandle, 0);
ham_entity_handle_free(hamEntityHandle);
ham_disconnect(0);
return false;
}
qCDebug(QT_REMOTEOBJECT, "HAM initialized for %s (pid = %d)", qPrintable(serverName), pid);
return true;
}
void QQnxNativeServerPrivate::configureHamDeath(int sentChannelId, pid_t toPid, uint64_t uid)
{
char identifier[25];
snprintf(identifier, 25, "%d_%d", toPid, sentChannelId);
ham_action_t *a = ham_action_notify_pulse(hamConditionHandle, identifier,
ND_LOCAL_NODE, toPid,
sentChannelId, NODE_DEATH, 0, 0);
if (!a)
WARNING(ham_action_notify_pulse)
else
hamActions.insert(uid, a);
}
void QQnxNativeServerPrivate::closeHamResources()
{
ham_entity_handle_free(hamEntityHandle);
ham_condition_handle_free(hamConditionHandle);
ham_disconnect(0);
}
#endif
QT_END_NAMESPACE