| /**************************************************************************** |
| ** |
| ** Copyright (C) 2017-2016 Ford Motor Company |
| ** Contact: https://www.qt.io/licensing/ |
| ** |
| ** This file is part of the QtRemoteObjects module of the Qt Toolkit. |
| ** |
| ** $QT_BEGIN_LICENSE:LGPL$ |
| ** Commercial License Usage |
| ** Licensees holding valid commercial Qt licenses may use this file in |
| ** accordance with the commercial license agreement provided with the |
| ** Software or, alternatively, in accordance with the terms contained in |
| ** a written agreement between you and The Qt Company. For licensing terms |
| ** and conditions see https://www.qt.io/terms-conditions. For further |
| ** information use the contact form at https://www.qt.io/contact-us. |
| ** |
| ** GNU Lesser General Public License Usage |
| ** Alternatively, this file may be used under the terms of the GNU Lesser |
| ** General Public License version 3 as published by the Free Software |
| ** Foundation and appearing in the file LICENSE.LGPL3 included in the |
| ** packaging of this file. Please review the following information to |
| ** ensure the GNU Lesser General Public License version 3 requirements |
| ** will be met: https://www.gnu.org/licenses/lgpl-3.0.html. |
| ** |
| ** GNU General Public License Usage |
| ** Alternatively, this file may be used under the terms of the GNU |
| ** General Public License version 2.0 or (at your option) the GNU General |
| ** Public license version 3 or any later version approved by the KDE Free |
| ** Qt Foundation. The licenses are as published by the Free Software |
| ** Foundation and appearing in the file LICENSE.GPL2 and LICENSE.GPL3 |
| ** included in the packaging of this file. Please review the following |
| ** information to ensure the GNU General Public License requirements will |
| ** be met: https://www.gnu.org/licenses/gpl-2.0.html and |
| ** https://www.gnu.org/licenses/gpl-3.0.html. |
| ** |
| ** $QT_END_LICENSE$ |
| ** |
| ****************************************************************************/ |
| |
| #include "qconnection_qnx_global_p.h" |
| #include "qconnection_qnx_qiodevices.h" |
| #include "qconnection_qnx_server.h" |
| #include "qconnection_qnx_qiodevices_p.h" |
| |
| QT_BEGIN_NAMESPACE |
| |
| QQnxNativeIoPrivate::QQnxNativeIoPrivate() |
| : QIODevicePrivate() |
| , serverId(-1) |
| , channelId(-1) |
| , connectionId(-1) |
| , state(QAbstractSocket::UnconnectedState) |
| , obuffer(new QRingBuffer) |
| , thread(this, QStringLiteral("NativeIo")) |
| { |
| //This lets us set msgType before any MsgSend |
| //and have the value sent as the header/type. |
| SETIOV(tx_iov + 0, &msgType, sizeof(msgType)); |
| SIGEV_NONE_INIT(&tx_pulse); |
| } |
| |
| QQnxNativeIoPrivate::~QQnxNativeIoPrivate() |
| { |
| teardownConnection(); |
| } |
| |
| bool QQnxNativeIoPrivate::establishConnection() |
| { |
| //On the client side, we need to create the channel/connection |
| //to listen for the server's send pulse. |
| if (channelId == -1) { |
| const int channel = ChannelCreate(0); |
| if (channel == -1) { |
| WARNING(ChannelCreate) |
| return false; |
| } |
| channelId = channel; |
| } |
| |
| const int connection = ConnectAttach(ND_LOCAL_NODE, 0, channelId, _NTO_SIDE_CHANNEL, 0); |
| if (connection == -1) { |
| WARNING(ConnectAttach) |
| teardownConnection(); |
| return false; |
| } |
| connectionId = connection; |
| |
| SIGEV_PULSE_INIT(&tx_pulse, connection, SIGEV_PULSE_PRIO_INHERIT, SOURCE_TX_RQ, 0); |
| SIGEV_MAKE_UPDATEABLE(&tx_pulse); |
| qCDebug(QT_REMOTEOBJECT) << "in establish" << tx_pulse.sigev_code << SOURCE_TX_RQ; |
| |
| if (MsgRegisterEvent(&tx_pulse, connectionId) == -1) { |
| qCWarning(QT_REMOTEOBJECT) << "Unable to register event for server" << serverName; |
| teardownConnection(); |
| return false; |
| } |
| |
| const int id = name_open(qPrintable(serverName), 0); |
| if (id == -1) { |
| qCWarning(QT_REMOTEOBJECT) << "Unable to connect to server" << serverName; |
| teardownConnection(); |
| return false; |
| } |
| serverId = id; |
| |
| thread.start(); |
| |
| Q_Q(QQnxNativeIo); |
| qCDebug(QT_REMOTEOBJECT) << "Before INIT: ServerId" << id << "name" << serverName << q; |
| SETIOV(tx_iov + 1, &tx_pulse, sizeof(tx_pulse)); |
| SETIOV(tx_iov + 2, &channelId, sizeof(channelId)); |
| |
| //Send our registration message to the server |
| msgType = MsgType::REPLICA_INIT; |
| //We send tx_pulse along. When the server want to |
| //transmit data, it sends the pulse back to us. |
| //When we see that (in receive_thread) we send a |
| //message and get the server's tx data in the reply. |
| // |
| //We transmit the channelId as well - it is only used |
| //if HAM is enabled, but this will prevent a possible |
| //mismatch between code compiled with vs. without HAM |
| //(which could happen if we ever use QCONN between |
| //devices) |
| if (MsgSendv(serverId, tx_iov, 3, nullptr, 0) == -1) { |
| WARNING(MsgSendv) |
| teardownConnection(); |
| return false; |
| } |
| |
| state = QAbstractSocket::ConnectedState; |
| emit q->stateChanged(state); |
| |
| return true; |
| } |
| |
| void QQnxNativeIoPrivate::teardownConnection() |
| { |
| Q_Q(QQnxNativeIo); |
| state = QAbstractSocket::ClosingState; |
| emit q->stateChanged(state); |
| |
| stopThread(); |
| |
| state = QAbstractSocket::UnconnectedState; |
| emit q->stateChanged(state); |
| } |
| |
| void QQnxNativeIoPrivate::stopThread() |
| { |
| if (thread.isRunning()) { |
| for (int count = 0; count < MAX_RETRIES; ++count) { |
| if (MsgSendPulse(connectionId, -1, TERMINATE, 0) == -1) { |
| if (errno == EAGAIN) { |
| usleep(5000 + (rand() % 10) * 10000); //5 to 95 msec |
| qCWarning(QT_REMOTEOBJECT) << "Retrying terminate pulse"; |
| continue; |
| } |
| qFatal("MsgSendPulse failed on terminate pulse. Error = %s", strerror(errno)); |
| } |
| thread.wait(); |
| return; |
| } |
| if (errno == EAGAIN) |
| qFatal("MsgSendPulse failed on terminate pulse (max retries)"); |
| } |
| } |
| |
| // method (run in a thread) to watch for connections and handle receiving data |
| void QQnxNativeIoPrivate::thread_func() |
| { |
| Q_Q(QQnxNativeIo); |
| |
| _pulse pulse; |
| |
| qCDebug(QT_REMOTEOBJECT) << "Client thread_func running"; |
| |
| bool running = true; |
| int nTxRequestToIgnore = 0; |
| while (running) { |
| int rcvid = MsgReceivePulse(channelId, &pulse, sizeof(pulse), nullptr); |
| if (rcvid == -1) |
| continue; |
| |
| qCDebug(QT_REMOTEOBJECT) << "MsgReceivePulse unblocked, code =" << pulse.code; |
| |
| switch (pulse.code) { |
| case SOURCE_TX_RQ: //The Source object wants to send us data |
| { |
| const int len = pulse.value.sival_int; |
| qCDebug(QT_REMOTEOBJECT, "TX request with len = %d, tx ignore = %d", len, nTxRequestToIgnore); |
| if (nTxRequestToIgnore) { |
| --nTxRequestToIgnore; |
| break; |
| } |
| |
| int bytesLeft; |
| |
| msgType = MsgType::SOURCE_TX_RESP; |
| ibLock.lockForWrite(); |
| iov_t reply_vector[2]; |
| SETIOV(reply_vector, &bytesLeft, sizeof(bytesLeft)); |
| SETIOV(reply_vector+1, buffer.reserve(len), len); |
| const int res = MsgSendv(serverId, tx_iov, 1, reply_vector, 2); |
| |
| if (res == -1) { |
| buffer.chop(len); |
| ibLock.unlock(); |
| WARNING(MsgSendv); |
| break; |
| } |
| ibLock.unlock(); |
| |
| qCDebug(QT_REMOTEOBJECT) << "Reply said bytesLeft =" << bytesLeft; |
| |
| if (bytesLeft) { |
| msgType = MsgType::SOURCE_TX_RESP_REPEAT; |
| ibLock.lockForWrite(); |
| SETIOV(reply_vector, &nTxRequestToIgnore, sizeof(nTxRequestToIgnore)); |
| SETIOV(reply_vector+1, buffer.reserve(bytesLeft), bytesLeft); |
| const int res = MsgSendv(serverId, tx_iov, 1, reply_vector, 2); |
| if (res == -1) { |
| buffer.chop(bytesLeft); |
| ibLock.unlock(); |
| WARNING(MsgSendv); |
| break; |
| } |
| ibLock.unlock(); |
| qCDebug(QT_REMOTEOBJECT) << "Reply2 said nTxRequestToIgnore =" << nTxRequestToIgnore; |
| } |
| |
| QMetaObject::invokeMethod(q, "readyRead", Qt::QueuedConnection); |
| } |
| break; |
| case REPLICA_WRITE: //Our node has data to send |
| { |
| const int len = pulse.value.sival_int; |
| obLock.lockForWrite(); //NAR (Not-An-Error) |
| const QByteArray payload = obuffer->read(); |
| obLock.unlock(); |
| Q_ASSERT(len == payload.length()); |
| |
| msgType = MsgType::REPLICA_TX_RECV; |
| SETIOV(tx_iov + 1, payload.constData(), len); |
| if (MsgSendvs(serverId, tx_iov, 2, nullptr, 0) == -1) { |
| WARNING(MsgSendvs); |
| obLock.lockForWrite(); |
| if (obuffer->isEmpty()) { |
| obuffer->append(payload); |
| } else { |
| //Since QRingBuffer just holds a QList of |
| //QByteArray, copying the QByteArrays to |
| //another container is cheap. |
| QRingBuffer *newBuffer = new QRingBuffer; |
| newBuffer->append(payload); |
| while (!obuffer->isEmpty()) |
| newBuffer->append(obuffer->read()); |
| obuffer.reset(newBuffer); |
| } |
| obLock.unlock(); |
| WARNING(MsgSendvs); |
| } |
| } |
| break; |
| case TERMINATE: |
| running = false; |
| continue; |
| case NODE_DEATH: |
| qCWarning(QT_REMOTEOBJECT) << "Host node died"; |
| running = false; |
| emit q->error(QAbstractSocket::RemoteHostClosedError); |
| continue; |
| default: |
| /* some other unexpected message */ |
| qCWarning(QT_REMOTEOBJECT) << "unexpected pulse type:" << pulse.type << __FILE__ << __LINE__; |
| WARN_ON_ERROR(MsgError, rcvid, ENOSYS) |
| break; |
| } |
| } |
| |
| if (serverId >= 0) { |
| WARN_ON_ERROR(name_close, serverId) |
| serverId = -1; |
| } |
| |
| if (tx_pulse.sigev_notify & SIGEV_FLAG_HANDLE) { |
| WARN_ON_ERROR(MsgUnregisterEvent, &tx_pulse); |
| SIGEV_NONE_INIT(&tx_pulse); |
| } |
| |
| if (connectionId >= 0) { |
| WARN_ON_ERROR(ConnectDetach, connectionId) |
| connectionId = -1; |
| } |
| |
| if (channelId >= 0) { |
| WARN_ON_ERROR(ChannelDestroy, channelId) |
| channelId = -1; |
| } |
| |
| qCDebug(QT_REMOTEOBJECT) << "Client thread_func stopped"; |
| } |
| |
| QQnxNativeIo::QQnxNativeIo(QObject *parent) |
| : QIODevice(*new QQnxNativeIoPrivate, parent) |
| { |
| } |
| |
| QQnxNativeIo::~QQnxNativeIo() |
| { |
| close(); |
| } |
| |
| bool QQnxNativeIo::connectToServer(QIODevice::OpenMode openMode) |
| { |
| Q_D(QQnxNativeIo); |
| Q_UNUSED(openMode); |
| |
| if (state() == QAbstractSocket::ConnectedState || |
| state() == QAbstractSocket::ConnectingState) { |
| setErrorString(QStringLiteral("Already connected")); |
| emit error(QAbstractSocket::OperationError); |
| return false; |
| } |
| |
| const int omMask = QIODevice::Append | QIODevice::Truncate | QIODevice::Text; |
| if (openMode & omMask) |
| qCWarning(QT_REMOTEOBJECT, "Tried to open using unsupported open mode flags."); |
| |
| d->errorString.clear(); |
| d->state = QAbstractSocket::ConnectingState; |
| emit stateChanged(d->state); |
| |
| if (d->serverName.isEmpty()) { |
| setErrorString(QStringLiteral("serverName not set")); |
| emit error(QAbstractSocket::HostNotFoundError); |
| return false; |
| } |
| |
| QIODevice::open(openMode & (~omMask)); |
| |
| if (!d->establishConnection()) { |
| QIODevice::close(); |
| qCWarning(QT_REMOTEOBJECT, "Failed to connect to server"); |
| emit error(QAbstractSocket::UnknownSocketError); |
| return false; |
| } |
| |
| emit stateChanged(d->state); |
| |
| return true; |
| } |
| |
| bool QQnxNativeIo::connectToServer(const QString &name, QIODevice::OpenMode openMode) |
| { |
| setServerName(name); |
| return connectToServer(openMode); |
| } |
| |
| void QQnxNativeIo::disconnectFromServer() |
| { |
| close(); |
| } |
| |
| void QQnxNativeIo::setServerName(const QString &name) |
| { |
| Q_D(QQnxNativeIo); |
| |
| if (d->state != QAbstractSocket::UnconnectedState) { |
| qCWarning(QT_REMOTEOBJECT) << "QQnxNativeIo::setServerName() called while not unconnected"; |
| return; |
| } |
| |
| d->serverName = name; |
| } |
| |
| QString QQnxNativeIo::serverName() const |
| { |
| Q_D(const QQnxNativeIo); |
| return d->serverName; |
| } |
| |
| void QQnxNativeIo::abort() |
| { |
| Q_D(QQnxNativeIo); |
| |
| d->stopThread(); |
| //Don't need mutex since thread is stopped |
| d->obuffer->clear(); |
| d->buffer.clear(); |
| |
| d->state = QAbstractSocket::UnconnectedState; |
| emit stateChanged(d->state); |
| QIODevice::close(); |
| } |
| |
| bool QQnxNativeIo::isSequential() const |
| { |
| return true; |
| } |
| |
| qint64 QQnxNativeIo::bytesAvailable() const |
| { |
| Q_D(const QQnxNativeIo); |
| |
| d->ibLock.lockForRead(); |
| qint64 size = d->buffer.size(); |
| d->ibLock.unlock(); |
| |
| return size; |
| } |
| |
| qint64 QQnxNativeIo::bytesToWrite() const |
| { |
| Q_D(const QQnxNativeIo); |
| |
| d->obLock.lockForRead(); |
| qint64 size = d->obuffer->size(); |
| d->obLock.unlock(); |
| |
| return size; |
| } |
| |
| bool QQnxNativeIo::open(QIODevice::OpenMode openMode) |
| { |
| const int omMask = QIODevice::Append | QIODevice::Truncate | QIODevice::Text; |
| if (openMode & omMask) |
| qCWarning(QT_REMOTEOBJECT, "Tried to open using unsupported open mode flags."); |
| |
| return connectToServer(openMode & (~omMask)); |
| } |
| |
| void QQnxNativeIo::close() |
| { |
| Q_D(QQnxNativeIo); |
| |
| if (!isOpen()) |
| return; |
| |
| d->teardownConnection(); |
| |
| d->obuffer->clear(); |
| d->buffer.clear(); |
| QIODevice::close(); |
| } |
| |
| QAbstractSocket::SocketState QQnxNativeIo::state() const |
| { |
| Q_D(const QQnxNativeIo); |
| return d->state; |
| } |
| |
| bool QQnxNativeIo::waitForBytesWritten(int msecs) |
| { |
| //TODO - This method isn't used by Qt Remote Objects, but would |
| //need to be implemented before this class could be used as a |
| //generic QIODevice. |
| Q_UNUSED(msecs); |
| Q_ASSERT(false); |
| return false; |
| } |
| |
| bool QQnxNativeIo::waitForReadyRead(int msecs) |
| { |
| //TODO - This method isn't used by Qt Remote Objects, but would |
| //need to be implemented before this class could be used as a |
| //generic QIODevice. |
| Q_UNUSED(msecs); |
| Q_ASSERT(false); |
| return false; |
| } |
| |
| qint64 QQnxNativeIo::readData(char *data, qint64 size) |
| { |
| Q_D(QQnxNativeIo); |
| qint64 read; |
| |
| if (!isReadable()) |
| return 0; |
| |
| d->ibLock.lockForWrite(); //NAR (Not-An-Error) |
| read = d->buffer.read(data, size); |
| d->ibLock.unlock(); |
| |
| return read; |
| } |
| |
| qint64 QQnxNativeIo::writeData(const char *data, qint64 size) |
| { |
| Q_D(QQnxNativeIo); |
| |
| if (!isWritable()) |
| return 0; |
| |
| if (size < 0 || size > INT_MAX) { |
| qCWarning(QT_REMOTEOBJECT) << "Invalid size (" << size << ") passed to QtRO QNX backend writeData()."; |
| return -1; |
| } |
| |
| int isize = static_cast<int>(size); |
| |
| d->obLock.lockForWrite(); |
| d->obuffer->append(QByteArray(data, isize)); |
| d->obLock.unlock(); |
| |
| WARN_AND_RETURN_ON_ERROR(MsgSendPulse, -1, d->connectionId, -1, PulseType::REPLICA_WRITE, isize) |
| |
| return size; |
| } |
| |
| /* QIOQnxSource ***************************************************************/ |
| |
| QIOQnxSourcePrivate::QIOQnxSourcePrivate(int _rcvid) |
| : QIODevicePrivate() |
| , rcvid(_rcvid) |
| , state(QAbstractSocket::ConnectedState) |
| { |
| } |
| |
| QIOQnxSource::QIOQnxSource(int rcvid, QObject *parent) |
| : QIODevice(*new QIOQnxSourcePrivate(rcvid), parent) |
| { |
| setOpenMode(QIODevice::ReadWrite); |
| } |
| |
| QIOQnxSource::~QIOQnxSource() |
| { |
| close(); |
| } |
| |
| bool QIOQnxSource::isSequential() const |
| { |
| return true; |
| } |
| |
| qint64 QIOQnxSource::bytesAvailable() const |
| { |
| Q_D(const QIOQnxSource); |
| |
| d->ibLock.lockForRead(); |
| qint64 size = d->buffer.size(); |
| d->ibLock.unlock(); |
| |
| return size; |
| } |
| |
| qint64 QIOQnxSource::bytesToWrite() const |
| { |
| Q_D(const QIOQnxSource); |
| |
| d->obLock.lockForRead(); |
| qint64 size = d->obuffer.size(); |
| d->obLock.unlock(); |
| |
| return size; |
| } |
| |
| bool QIOQnxSource::open(QIODevice::OpenMode openMode) |
| { |
| Q_UNUSED(openMode); |
| return false; |
| } |
| |
| void QIOQnxSource::onDisconnected() |
| { |
| close(); |
| emit disconnected(); |
| } |
| |
| void QIOQnxSource::close() |
| { |
| Q_D(QIOQnxSource); |
| |
| if (!isOpen()) |
| return; |
| |
| d->state = QAbstractSocket::ClosingState; |
| emit stateChanged(d->state); |
| |
| d->state = QAbstractSocket::UnconnectedState; |
| emit stateChanged(d->state); |
| |
| d->obuffer.clear(); |
| d->buffer.clear(); |
| QIODevice::close(); |
| } |
| |
| QAbstractSocket::SocketState QIOQnxSource::state() const |
| { |
| Q_D(const QIOQnxSource); |
| return d->state; |
| } |
| |
| bool QIOQnxSource::waitForBytesWritten(int msecs) |
| { |
| //TODO - This method isn't used by Qt Remote Objects, but would |
| //need to be implemented before this class could be used as a |
| //generic QIODevice. |
| Q_UNUSED(msecs); |
| Q_ASSERT(false); |
| return false; |
| } |
| |
| bool QIOQnxSource::waitForReadyRead(int msecs) |
| { |
| //TODO - This method isn't used by Qt Remote Objects, but would |
| //need to be implemented before this class could be used as a |
| //generic QIODevice. |
| Q_UNUSED(msecs); |
| Q_ASSERT(false); |
| return false; |
| } |
| |
| qint64 QIOQnxSource::readData(char *data, qint64 size) |
| { |
| Q_D(QIOQnxSource); |
| qint64 read; |
| |
| if (!isReadable()) |
| return 0; |
| |
| d->ibLock.lockForWrite(); //NAR (Not-An-Error) |
| read = d->buffer.read(data, size); |
| d->ibLock.unlock(); |
| |
| return read; |
| } |
| |
| qint64 QIOQnxSource::writeData(const char *data, qint64 size) |
| { |
| Q_D(QIOQnxSource); |
| |
| if (!isWritable()) |
| return 0; |
| |
| if (size < 0 || size > INT_MAX) { |
| qCWarning(QT_REMOTEOBJECT) << "Invalid size (" << size << ") passed to QtRO QNX backend writeData()."; |
| return -1; |
| } |
| |
| int isize = static_cast<int>(size); |
| |
| d->obLock.lockForWrite(); |
| d->obuffer.append(QByteArray(data, isize)); |
| d->obLock.unlock(); |
| |
| if (!d->m_serverClosing.load()) { |
| d->m_event.sigev_value.sival_int = isize; |
| WARN_ON_ERROR(MsgDeliverEvent, d->rcvid, &(d->m_event)) |
| } |
| |
| return size; |
| } |
| |
| QT_END_NAMESPACE |