1
0
mirror of https://bitbucket.org/librepilot/librepilot.git synced 2024-12-02 10:24:11 +01:00

OP-1122 OP-1125 made GCS telemetry and uavtalk a bit more robust (more error case handling)

minor cleanup and code reorganization
some preparatory work to read from IODevice  from its own thread
This commit is contained in:
Philippe Renon 2013-12-08 14:30:51 +01:00
parent 4f9231e247
commit 612e66367a
5 changed files with 196 additions and 104 deletions

View File

@ -230,11 +230,6 @@ void Telemetry::transactionCompleted(UAVObject *obj, bool success)
// Lookup the transaction in the transaction map.
ObjectTransactionInfo *transInfo = findTransaction(obj);
if (transInfo) {
// Remove this transaction as it's complete.
transInfo->timer->stop();
closeTransaction(transInfo);
delete transInfo;
// Send signal
if (success) {
#ifdef VERBOSE_TELEMETRY
qDebug() << "Telemetry - transaction successful for object" << obj->toStringBrief();
@ -242,7 +237,13 @@ void Telemetry::transactionCompleted(UAVObject *obj, bool success)
} else {
qWarning() << "Telemetry - !!! transaction failed for object" << obj->toStringBrief();
}
// Remove this transaction as it's complete.
closeTransaction(transInfo);
// Send signal
obj->emitTransactionCompleted(success);
// Process new object updates from queue
processObjectQueue();
} else {
@ -255,30 +256,33 @@ void Telemetry::transactionCompleted(UAVObject *obj, bool success)
*/
void Telemetry::transactionTimeout(ObjectTransactionInfo *transInfo)
{
transInfo->timer->stop();
// Check if more retries are pending
if (transInfo->retriesRemaining > 0) {
#ifdef VERBOSE_TELEMETRY
qDebug().nospace() << "Telemetry - transaction timed out for object " << transInfo->obj->toStringBrief() << ", retrying...";
#endif
--transInfo->retriesRemaining;
processObjectTransaction(transInfo);
++txRetries;
--transInfo->retriesRemaining;
// Retry the transaction
processObjectTransaction(transInfo);
} else {
// Stop the timer.
transInfo->timer->stop();
qWarning().nospace() << "Telemetry - !!! transaction timed out for object " << transInfo->obj->toStringBrief();
++txErrors;
// Terminate transaction
utalk->cancelTransaction(transInfo->obj);
// Send signal
qWarning().nospace() << "Telemetry - !!! transaction timed out for object " << transInfo->obj->toStringBrief();
transInfo->obj->emitTransactionCompleted(false);
// Remove this transaction as it's complete.
// FIXME : also remove transaction from UAVTalk transaction map
UAVObject *obj = transInfo->obj;
closeTransaction(transInfo);
delete transInfo;
// Send signal
obj->emitTransactionCompleted(false);
// Process new object updates from queue
processObjectQueue();
++txErrors;
}
}
@ -288,24 +292,33 @@ void Telemetry::transactionTimeout(ObjectTransactionInfo *transInfo)
void Telemetry::processObjectTransaction(ObjectTransactionInfo *transInfo)
{
// Initiate transaction
bool sent = false;
if (transInfo->objRequest) {
#ifdef VERBOSE_TELEMETRY
qDebug().nospace() << "Telemetry - sending object request for " << transInfo->obj->toStringBrief() << ", " << (transInfo->allInstances ? "all" : "single") << " " << (transInfo->acked ? "acked" : "");
qDebug().nospace() << "Telemetry - sending request for object " << transInfo->obj->toStringBrief() << ", " << (transInfo->allInstances ? "all" : "single") << " " << (transInfo->acked ? "acked" : "");
#endif
utalk->sendObjectRequest(transInfo->obj, transInfo->allInstances);
sent = utalk->sendObjectRequest(transInfo->obj, transInfo->allInstances);
} else {
#ifdef VERBOSE_TELEMETRY
qDebug().nospace() << "Telemetry - sending object " << transInfo->obj->toStringBrief() << ", " << (transInfo->allInstances ? "all" : "single") << " " << (transInfo->acked ? "acked" : "");
#endif
utalk->sendObject(transInfo->obj, transInfo->acked, transInfo->allInstances);
sent = utalk->sendObject(transInfo->obj, transInfo->acked, transInfo->allInstances);
}
// Start timer if a response is expected
// Check if a response is needed now or will arrive asynchronously
if (transInfo->objRequest || transInfo->acked) {
if (sent) {
// Start timer if a response is expected
transInfo->timer->start(REQ_TIMEOUT_MS);
} else {
// Otherwise, remove this transaction as it's complete.
}
else {
// message was not sent, the transaction will not complete and will timeout
// there is no need to wait to close the transaction and notify of completion failure
//transactionCompleted(transInfo->obj, false);
}
}
else {
// not transacted, so just close the transaction with no notification of completion
closeTransaction(transInfo);
delete transInfo;
}
}
@ -380,7 +393,7 @@ void Telemetry::processObjectQueue()
// TODO make the above logic a reality...
if (findTransaction(objInfo.obj)) {
qWarning() << "Telemetry - !!! Making request for a object " << objInfo.obj->toStringBrief() << " for which a request is already in progress";
objInfo.obj->emitTransactionCompleted(false);
//objInfo.obj->emitTransactionCompleted(false);
return;
}
UAVObject::Metadata metadata = objInfo.obj->getMetadata();
@ -599,6 +612,7 @@ void Telemetry::closeTransaction(ObjectTransactionInfo *trans)
// Keep the map even if it is empty
// There are at most 100 different object IDs...
}
delete trans;
}
void Telemetry::closeAllTransactions()
@ -626,7 +640,7 @@ ObjectTransactionInfo::ObjectTransactionInfo(QObject *parent) : QObject(parent)
telem = 0;
// Setup transaction timer
timer = new QTimer(this);
timer->stop();
timer->setSingleShot(true);
connect(timer, SIGNAL(timeout()), this, SLOT(timeout()));
}

View File

@ -30,8 +30,7 @@
#include <coreplugin/icore.h>
#include <coreplugin/threadmanager.h>
TelemetryManager::TelemetryManager() :
autopilotConnected(false)
TelemetryManager::TelemetryManager() : autopilotConnected(false)
{
moveToThread(Core::ICore::instance()->threadManager()->getRealTimeThread());
// Get UAVObjectManager instance
@ -44,7 +43,8 @@ TelemetryManager::TelemetryManager() :
}
TelemetryManager::~TelemetryManager()
{}
{
}
bool TelemetryManager::isConnected()
{
@ -62,6 +62,29 @@ void TelemetryManager::onStart()
utalk = new UAVTalk(device, objMngr);
telemetry = new Telemetry(utalk, objMngr);
telemetryMon = new TelemetryMonitor(objMngr, telemetry);
if (false) {
// UAVTalk must be thread safe and for that:
// 1- all public methods must lock a mutex
// 2- the reader thread must lock that mutex too
// The reader thread locks the mutex once a packet is read and decoded.
// It is assumed that the UAVObjectManager is thread safe
// Create the reader and move it to the reader thread
IODeviceReader *reader = new IODeviceReader(utalk);
reader->moveToThread(&readerThread);
// The reader will be deleted (later) when the thread finishes
connect(&readerThread, &QThread::finished, reader, &QObject::deleteLater);
// Connect IO device to reader
connect(device, SIGNAL(readyRead()), reader, SLOT(read()));
// start the reader thread
readerThread.start();
}
else {
// Connect IO device to reader
connect(device, SIGNAL(readyRead()), utalk, SLOT(processInputStream()));
}
connect(telemetryMon, SIGNAL(connected()), this, SLOT(onConnect()));
connect(telemetryMon, SIGNAL(disconnected()), this, SLOT(onDisconnect()));
}
@ -70,6 +93,11 @@ void TelemetryManager::onStart()
void TelemetryManager::stop()
{
emit myStop();
if (false) {
readerThread.quit();
readerThread.wait();
}
}
void TelemetryManager::onStop()
@ -92,3 +120,13 @@ void TelemetryManager::onDisconnect()
autopilotConnected = false;
emit disconnected();
}
IODeviceReader::IODeviceReader(UAVTalk *uavTalk) : uavTalk(uavTalk)
{
}
void IODeviceReader::read()
{
uavTalk->processInputStream();
}

View File

@ -66,6 +66,20 @@ private:
TelemetryMonitor *telemetryMon;
QIODevice *device;
bool autopilotConnected;
QThread readerThread;
};
class IODeviceReader : public QObject
{
Q_OBJECT
public:
IODeviceReader(UAVTalk *uavTalk);
UAVTalk *uavTalk;
public slots:
void read();
};
#endif // TELEMETRYMANAGER_H

View File

@ -25,11 +25,13 @@
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "uavtalk.h"
#include <QtEndian>
#include <QDebug>
#include <extensionsystem/pluginmanager.h>
#include <coreplugin/generalsettings.h>
#include <QtEndian>
#include <QDebug>
#include <QEventLoop>
// #define UAVTALK_DEBUG
#ifdef UAVTALK_DEBUG
#define UAVTALK_QXTLOG_DEBUG(args ...)
@ -64,20 +66,13 @@ const quint8 UAVTalk::crc_table[256] = {
/**
* Constructor
*/
UAVTalk::UAVTalk(QIODevice *iodev, UAVObjectManager *objMngr)
UAVTalk::UAVTalk(QIODevice *iodev, UAVObjectManager *objMngr) : io(iodev), objMngr(objMngr), mutex(QMutex::Recursive)
{
io = iodev;
this->objMngr = objMngr;
rxState = STATE_SYNC;
rxPacketLength = 0;
mutex = new QMutex(QMutex::Recursive);
memset(&stats, 0, sizeof(ComStats));
connect(io, SIGNAL(readyRead()), this, SLOT(processInputStream()));
ExtensionSystem::PluginManager *pm = ExtensionSystem::PluginManager::instance();
Core::Internal::GeneralSettings *settings = pm->getObject<Core::Internal::GeneralSettings>();
useUDPMirror = settings->useUDPMirror();
@ -94,9 +89,9 @@ UAVTalk::UAVTalk(QIODevice *iodev, UAVObjectManager *objMngr)
UAVTalk::~UAVTalk()
{
// According to Qt, it is not necessary to disconnect upon
// object deletion.
// disconnect(io, SIGNAL(readyRead()), this, SLOT(processInputStream()));
// According to Qt, it is not necessary to disconnect upon object deletion.
// disconnect(io, SIGNAL(readyRead()), worker, SLOT(processInputStream()));
closeAllTransactions();
}
@ -106,7 +101,7 @@ UAVTalk::~UAVTalk()
*/
void UAVTalk::resetStats()
{
QMutexLocker locker(mutex);
QMutexLocker locker(&mutex);
memset(&stats, 0, sizeof(ComStats));
}
@ -116,26 +111,11 @@ void UAVTalk::resetStats()
*/
UAVTalk::ComStats UAVTalk::getStats()
{
QMutexLocker locker(mutex);
QMutexLocker locker(&mutex);
return stats;
}
/**
* Called each time there are data in the input buffer
*/
void UAVTalk::processInputStream()
{
quint8 tmp;
if (io && io->isReadable()) {
while (io->bytesAvailable() > 0) {
io->read((char *)&tmp, 1);
processInputByte(tmp);
}
}
}
void UAVTalk::dummyUDPRead()
{
QUdpSocket *socket = qobject_cast<QUdpSocket *>(sender());
@ -147,6 +127,35 @@ void UAVTalk::dummyUDPRead()
}
}
/**
* Send the specified object through the telemetry link.
* \param[in] obj Object to send
* \param[in] acked Selects if an ack is required
* \param[in] allInstances If set true then all instances will be updated
* \return Success (true), Failure (false)
*/
bool UAVTalk::sendObject(UAVObject *obj, bool acked, bool allInstances)
{
QMutexLocker locker(&mutex);
quint16 instId = 0;
if (allInstances) {
instId = ALL_INSTANCES;
}
else if (obj) {
instId = obj->getInstID();
}
bool success = false;
if (acked) {
success = objectTransaction(TYPE_OBJ_ACK, obj->getObjID(), instId, obj);
} else {
success = objectTransaction(TYPE_OBJ, obj->getObjID(), instId, obj);
}
return success;
}
/**
* Request an update for the specified object, on success the object data would have been
* updated by the GCS.
@ -156,7 +165,7 @@ void UAVTalk::dummyUDPRead()
*/
bool UAVTalk::sendObjectRequest(UAVObject *obj, bool allInstances)
{
QMutexLocker locker(mutex);
QMutexLocker locker(&mutex);
quint16 instId = 0;
@ -169,38 +178,12 @@ bool UAVTalk::sendObjectRequest(UAVObject *obj, bool allInstances)
return objectTransaction(TYPE_OBJ_REQ, obj->getObjID(), instId, obj);
}
/**
* Send the specified object through the telemetry link.
* \param[in] obj Object to send
* \param[in] acked Selects if an ack is required
* \param[in] allInstances If set true then all instances will be updated
* \return Success (true), Failure (false)
*/
bool UAVTalk::sendObject(UAVObject *obj, bool acked, bool allInstances)
{
QMutexLocker locker(mutex);
quint16 instId = 0;
if (allInstances) {
instId = ALL_INSTANCES;
}
else if (obj) {
instId = obj->getInstID();
}
if (acked) {
return objectTransaction(TYPE_OBJ_ACK, obj->getObjID(), instId, obj);
} else {
return objectTransaction(TYPE_OBJ, obj->getObjID(), instId, obj);
}
}
/**
* Cancel a pending transaction
*/
void UAVTalk::cancelTransaction(UAVObject *obj)
{
QMutexLocker locker(mutex);
QMutexLocker locker(&mutex);
if (io.isNull()) {
return;
@ -240,6 +223,21 @@ bool UAVTalk::objectTransaction(quint8 type, quint32 objId, quint16 instId, UAVO
}
}
/**
* Called each time there are data in the input buffer
*/
void UAVTalk::processInputStream()
{
quint8 tmp;
if (io && io->isReadable()) {
while (io->bytesAvailable() > 0) {
io->read((char *)&tmp, 1);
processInputByte(tmp);
}
}
}
/**
* Process an byte from the telemetry stream.
* \param[in] rxbyte Received byte
@ -446,18 +444,18 @@ bool UAVTalk::processInputByte(quint8 rxbyte)
break;
}
mutex->lock();
mutex.lock();
receiveObject(rxType, rxObjId, rxInstId, rxBuffer, rxLength);
if (useUDPMirror) {
udpSocketTx->writeDatagram(rxDataArray, QHostAddress::LocalHost, udpSocketRx->localPort());
}
stats.rxObjectBytes += rxLength;
stats.rxObjects++;
mutex->unlock();
mutex.unlock();
if (useUDPMirror) {
udpSocketTx->writeDatagram(rxDataArray, QHostAddress::LocalHost, udpSocketRx->localPort());
}
rxState = STATE_SYNC;
UAVTALK_QXTLOG_DEBUG("UAVTalk: CSum->Sync (OK)");
@ -661,7 +659,17 @@ void UAVTalk::updateAck(quint8 type, quint32 objId, quint16 instId, UAVObject *o
}
Transaction *trans = findTransaction(objId, instId);
if (trans && trans->respType == type) {
if (trans->respInstId != ALL_INSTANCES || instId == 0) {
if (trans->respInstId == ALL_INSTANCES) {
if (instId == 0) {
// last instance received, complete transaction
closeTransaction(trans);
emit transactionCompleted(obj, true);
}
else {
// TODO extend timeout?
}
}
else {
closeTransaction(trans);
emit transactionCompleted(obj, true);
}
@ -705,14 +713,27 @@ bool UAVTalk::transmitObject(quint8 type, quint32 objId, quint16 instId, UAVObje
// Process message type
if (type == TYPE_OBJ || type == TYPE_OBJ_ACK) {
if (allInstances) {
// Get number of instances
quint32 numInst = objMngr->getNumInstances(objId);
// Send all instances in reverse order
// This allows the receiver to detect when the last object has been received (i.e. when instance 0 is received)
quint32 numInst = objMngr->getNumInstances(objId);
for (quint32 n = 0; n < numInst; ++n) {
quint32 i = numInst - n - 1;
UAVObject *o = objMngr->getObject(objId, i);
transmitSingleObject(type, objId, i, o);
if (!transmitSingleObject(type, objId, i, o)) {
return false;
}
if (false) {
// yield
mutex.unlock();
// going back to the event loop is necessary to allow timeout events to be fired
// but don't allow user events as the event can cause the main thread to reenter UAVTalk
// the timer event suffers from the same issue but this case is handled
//QCoreApplication::processEvents(QEventLoop::ExcludeUserInputEvents, 10);
QThread::msleep(1);
mutex.lock();
}
}
return true;
} else {

View File

@ -27,19 +27,22 @@
#ifndef UAVTALK_H
#define UAVTALK_H
#include "uavobjectmanager.h"
#include "uavtalk_global.h"
#include <QtCore>
#include <QIODevice>
#include <QMutex>
#include <QMutexLocker>
#include <QMap>
#include <QSemaphore>
#include "uavobjectmanager.h"
#include "uavtalk_global.h"
#include <QThread>
#include <QtNetwork/QUdpSocket>
class UAVTALK_EXPORT UAVTalk : public QObject {
Q_OBJECT
friend class IODeviceReader;
public:
static const quint16 ALL_INSTANCES = 0xFFFF;
@ -56,17 +59,19 @@ public:
UAVTalk(QIODevice *iodev, UAVObjectManager *objMngr);
~UAVTalk();
ComStats getStats();
void resetStats();
bool sendObject(UAVObject *obj, bool acked, bool allInstances);
bool sendObjectRequest(UAVObject *obj, bool allInstances);
void cancelTransaction(UAVObject *obj);
ComStats getStats();
void resetStats();
signals:
void transactionCompleted(UAVObject *obj, bool success);
private slots:
void processInputStream(void);
void processInputStream();
void dummyUDPRead();
private:
@ -106,7 +111,7 @@ private:
// Variables
QPointer<QIODevice> io;
UAVObjectManager *objMngr;
QMutex *mutex;
QMutex mutex;
QMap<quint32, QMap<quint32, Transaction *> *> transMap;
quint8 rxBuffer[MAX_PACKET_LENGTH];
quint8 txBuffer[MAX_PACKET_LENGTH];