mirror of
https://bitbucket.org/librepilot/librepilot.git
synced 2025-01-18 03:52:11 +01:00
Added support for multiple transactions to GCS telemetry.
This commit is contained in:
parent
9870abcd2f
commit
952c6d9f32
@ -55,11 +55,6 @@ Telemetry::Telemetry(UAVTalk* utalk, UAVObjectManager* objMngr)
|
||||
connect(utalk, SIGNAL(transactionCompleted(UAVObject*,bool)), this, SLOT(transactionCompleted(UAVObject*,bool)));
|
||||
// Get GCS stats object
|
||||
gcsStatsObj = GCSTelemetryStats::GetInstance(objMngr);
|
||||
// Setup transaction timer
|
||||
transPending = false;
|
||||
transTimer = new QTimer(this);
|
||||
transTimer->stop();
|
||||
connect(transTimer, SIGNAL(timeout()), this, SLOT(transactionTimeout()));
|
||||
// Setup and start the periodic timer
|
||||
timeToNextUpdateMs = 0;
|
||||
updateTimer = new QTimer(this);
|
||||
@ -70,6 +65,12 @@ Telemetry::Telemetry(UAVTalk* utalk, UAVObjectManager* objMngr)
|
||||
txRetries = 0;
|
||||
}
|
||||
|
||||
Telemetry::~Telemetry()
|
||||
{
|
||||
for (QMap<quint32, ObjectTransactionInfo*>::iterator itr = transMap.begin(); itr != transMap.end(); ++itr)
|
||||
delete itr.value();
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a new object for periodic updates (if enabled)
|
||||
*/
|
||||
@ -231,84 +232,86 @@ void Telemetry::updateObject(UAVObject* obj, quint32 eventType)
|
||||
*/
|
||||
void Telemetry::transactionCompleted(UAVObject* obj, bool success)
|
||||
{
|
||||
// Check if there is a pending transaction and the objects match
|
||||
if ( transPending && transInfo.obj->getObjID() == obj->getObjID() )
|
||||
// Lookup the transaction in the transaction map.
|
||||
quint32 objId = obj->getObjID();
|
||||
QMap<quint32, ObjectTransactionInfo*>::iterator itr = transMap.find(objId);
|
||||
if ( itr != transMap.end() )
|
||||
{
|
||||
// qDebug() << QString("Telemetry: transaction completed for %1").arg(obj->getName());
|
||||
// Complete transaction
|
||||
transTimer->stop();
|
||||
transPending = false;
|
||||
ObjectTransactionInfo *transInfo = itr.value();
|
||||
//qDebug() << QString("Telemetry: transaction completed for %1").arg(obj->getName());
|
||||
// Remove this transaction as it's complete.
|
||||
transInfo->timer->stop();
|
||||
transMap.remove(objId);
|
||||
delete transInfo;
|
||||
// Send signal
|
||||
obj->emitTransactionCompleted(success);
|
||||
// Process new object updates from queue
|
||||
processObjectQueue();
|
||||
} else
|
||||
{
|
||||
// qDebug() << "Error: received a transaction completed when did not expect it.";
|
||||
qDebug() << "Error: received a transaction completed when did not expect it.";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when a transaction is not completed within the timeout period (timer event)
|
||||
*/
|
||||
void Telemetry::transactionTimeout()
|
||||
bool Telemetry::transactionTimeout(ObjectTransactionInfo *transInfo)
|
||||
{
|
||||
// qDebug() << "Telemetry: transaction timeout.";
|
||||
transTimer->stop();
|
||||
// Proceed only if there is a pending transaction
|
||||
if ( transPending )
|
||||
//qDebug() << "Telemetry: transaction timeout.";
|
||||
|
||||
transInfo->timer->stop();
|
||||
// Check if more retries are pending
|
||||
if (transInfo->retriesRemaining > 0)
|
||||
{
|
||||
// Check if more retries are pending
|
||||
if (transInfo.retriesRemaining > 0)
|
||||
{
|
||||
--transInfo.retriesRemaining;
|
||||
processObjectTransaction();
|
||||
++txRetries;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Terminate transaction
|
||||
utalk->cancelTransaction();
|
||||
transPending = false;
|
||||
// Send signal
|
||||
transInfo.obj->emitTransactionCompleted(false);
|
||||
// Process new object updates from queue
|
||||
processObjectQueue();
|
||||
++txErrors;
|
||||
}
|
||||
--transInfo->retriesRemaining;
|
||||
processObjectTransaction(transInfo);
|
||||
++txRetries;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Stop the timer.
|
||||
transInfo->timer->stop();
|
||||
// Terminate transaction
|
||||
utalk->cancelTransaction(transInfo->obj);
|
||||
// Send signal
|
||||
transInfo->obj->emitTransactionCompleted(false);
|
||||
// Remove this transaction as it's complete.
|
||||
transMap.remove(transInfo->obj->getObjID());
|
||||
delete transInfo;
|
||||
// Process new object updates from queue
|
||||
processObjectQueue();
|
||||
++txErrors;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start an object transaction with UAVTalk, all information is stored in transInfo
|
||||
*/
|
||||
void Telemetry::processObjectTransaction()
|
||||
void Telemetry::processObjectTransaction(ObjectTransactionInfo *transInfo)
|
||||
{
|
||||
if (transPending)
|
||||
|
||||
//qDebug() << tr("Process Object transaction for %1").arg(transInfo->obj->getName());
|
||||
// Initiate transaction
|
||||
if (transInfo->objRequest)
|
||||
{
|
||||
// qDebug() << tr("Process Object transaction for %1").arg(transInfo.obj->getName());
|
||||
// Initiate transaction
|
||||
if (transInfo.objRequest)
|
||||
{
|
||||
utalk->sendObjectRequest(transInfo.obj, transInfo.allInstances);
|
||||
}
|
||||
else
|
||||
{
|
||||
utalk->sendObject(transInfo.obj, transInfo.acked, transInfo.allInstances);
|
||||
}
|
||||
// Start timer if a response is expected
|
||||
if ( transInfo.objRequest || transInfo.acked )
|
||||
{
|
||||
transTimer->start(REQ_TIMEOUT_MS);
|
||||
}
|
||||
else
|
||||
{
|
||||
transTimer->stop();
|
||||
transPending = false;
|
||||
}
|
||||
} else
|
||||
utalk->sendObjectRequest(transInfo->obj, transInfo->allInstances);
|
||||
}
|
||||
else
|
||||
{
|
||||
// qDebug() << "Error: inside of processObjectTransaction with no transPending";
|
||||
utalk->sendObject(transInfo->obj, transInfo->acked, transInfo->allInstances);
|
||||
}
|
||||
// Start timer if a response is expected
|
||||
if ( transInfo->objRequest || transInfo->acked )
|
||||
{
|
||||
transInfo->timer->start(REQ_TIMEOUT_MS);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Otherwise, remove this transaction as it's complete.
|
||||
transInfo->timer->stop();
|
||||
transMap.remove(transInfo->obj->getObjID());
|
||||
delete transInfo;
|
||||
}
|
||||
}
|
||||
|
||||
@ -317,8 +320,9 @@ void Telemetry::processObjectTransaction()
|
||||
*/
|
||||
void Telemetry::processObjectUpdates(UAVObject* obj, EventMask event, bool allInstances, bool priority)
|
||||
{
|
||||
//qDebug() << "Push event into queue for obj " << QString("%1 event %2").arg(obj->getName()).arg(event);
|
||||
|
||||
// Push event into queue
|
||||
// qDebug() << "Push event into queue for obj " << QString("%1 event %2").arg(obj->getName()).arg(event);
|
||||
ObjectQueueInfo objInfo;
|
||||
objInfo.obj = obj;
|
||||
objInfo.event = event;
|
||||
@ -349,15 +353,8 @@ void Telemetry::processObjectUpdates(UAVObject* obj, EventMask event, bool allIn
|
||||
}
|
||||
}
|
||||
|
||||
// If there is no transaction in progress then process event
|
||||
if (!transPending)
|
||||
{
|
||||
// qDebug() << "No transaction pending, process object queue...";
|
||||
processObjectQueue();
|
||||
} else
|
||||
{
|
||||
// qDebug() << "Transaction pending, DO NOT process object queue...";
|
||||
}
|
||||
// Process the transaction
|
||||
processObjectQueue();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -365,14 +362,7 @@ void Telemetry::processObjectUpdates(UAVObject* obj, EventMask event, bool allIn
|
||||
*/
|
||||
void Telemetry::processObjectQueue()
|
||||
{
|
||||
// qDebug() << "Process object queue " << tr("- Depth (%1 %2)").arg(objQueue.length()).arg(objPriorityQueue.length());
|
||||
|
||||
// Don nothing if a transaction is already in progress (should not happen)
|
||||
if (transPending)
|
||||
{
|
||||
qxtLog->error("Telemetry: Dequeue while a transaction pending!");
|
||||
return;
|
||||
}
|
||||
//qDebug() << "Process object queue " << tr("- Depth (%1 %2)").arg(objQueue.length()).arg(objPriorityQueue.length());
|
||||
|
||||
// Get object information from queue (first the priority and then the regular queue)
|
||||
ObjectQueueInfo objInfo;
|
||||
@ -408,24 +398,26 @@ void Telemetry::processObjectQueue()
|
||||
if ( ( objInfo.event != EV_UNPACKED ) && ( ( objInfo.event != EV_UPDATED_PERIODIC ) || ( updateMode != UAVObject::UPDATEMODE_THROTTLED ) ) )
|
||||
{
|
||||
UAVObject::Metadata metadata = objInfo.obj->getMetadata();
|
||||
transInfo.obj = objInfo.obj;
|
||||
transInfo.allInstances = objInfo.allInstances;
|
||||
transInfo.retriesRemaining = MAX_RETRIES;
|
||||
transInfo.acked = UAVObject::GetGcsTelemetryAcked(metadata);
|
||||
ObjectTransactionInfo *transInfo = new ObjectTransactionInfo();
|
||||
transInfo->obj = objInfo.obj;
|
||||
transInfo->allInstances = objInfo.allInstances;
|
||||
transInfo->retriesRemaining = MAX_RETRIES;
|
||||
transInfo->acked = UAVObject::GetGcsTelemetryAcked(metadata);
|
||||
if ( objInfo.event == EV_UPDATED || objInfo.event == EV_UPDATED_MANUAL || objInfo.event == EV_UPDATED_PERIODIC )
|
||||
{
|
||||
transInfo.objRequest = false;
|
||||
transInfo->objRequest = false;
|
||||
}
|
||||
else if ( objInfo.event == EV_UPDATE_REQ )
|
||||
{
|
||||
transInfo.objRequest = true;
|
||||
transInfo->objRequest = true;
|
||||
}
|
||||
// Start transaction
|
||||
transPending = true;
|
||||
processObjectTransaction();
|
||||
transInfo->telem = this;
|
||||
// Insert the transaction into the transaction map.
|
||||
transMap.insert(objInfo.obj->getObjID(), transInfo);
|
||||
processObjectTransaction(transInfo);
|
||||
} else
|
||||
{
|
||||
// qDebug() << QString("Process object queue: this is an unpack event for %1").arg(objInfo.obj->getName());
|
||||
qDebug() << QString("Process object queue: this is an unpack event for %1").arg(objInfo.obj->getName());
|
||||
}
|
||||
|
||||
// If this is a metaobject then make necessary telemetry updates
|
||||
@ -579,6 +571,29 @@ void Telemetry::newInstance(UAVObject* obj)
|
||||
registerObject(obj);
|
||||
}
|
||||
|
||||
ObjectTransactionInfo::ObjectTransactionInfo()
|
||||
{
|
||||
obj = 0;
|
||||
allInstances = false;
|
||||
objRequest = false;
|
||||
retriesRemaining = 0;
|
||||
acked = false;
|
||||
telem = 0;
|
||||
// Setup transaction timer
|
||||
timer = new QTimer(this);
|
||||
timer->stop();
|
||||
connect(timer, SIGNAL(timeout()), this, SLOT(timeout()));
|
||||
}
|
||||
|
||||
ObjectTransactionInfo::~ObjectTransactionInfo()
|
||||
{
|
||||
telem = 0;
|
||||
timer->stop();
|
||||
delete timer;
|
||||
}
|
||||
|
||||
|
||||
void ObjectTransactionInfo::timeout()
|
||||
{
|
||||
if (telem)
|
||||
telem->transactionTimeout(this);
|
||||
}
|
||||
|
@ -35,6 +35,24 @@
|
||||
#include <QMutexLocker>
|
||||
#include <QTimer>
|
||||
#include <QQueue>
|
||||
#include <QMap>
|
||||
|
||||
class ObjectTransactionInfo: public QObject {
|
||||
Q_OBJECT
|
||||
|
||||
public:
|
||||
ObjectTransactionInfo();
|
||||
~ObjectTransactionInfo();
|
||||
UAVObject* obj;
|
||||
bool allInstances;
|
||||
bool objRequest;
|
||||
qint32 retriesRemaining;
|
||||
bool acked;
|
||||
class Telemetry* telem;
|
||||
QTimer* timer;
|
||||
private slots:
|
||||
void timeout();
|
||||
};
|
||||
|
||||
class Telemetry: public QObject
|
||||
{
|
||||
@ -54,24 +72,13 @@ public:
|
||||
} TelemetryStats;
|
||||
|
||||
Telemetry(UAVTalk* utalk, UAVObjectManager* objMngr);
|
||||
~Telemetry();
|
||||
TelemetryStats getStats();
|
||||
void resetStats();
|
||||
|
||||
bool transactionTimeout(ObjectTransactionInfo *info);
|
||||
|
||||
signals:
|
||||
|
||||
private slots:
|
||||
void objectUpdatedAuto(UAVObject* obj);
|
||||
void objectUpdatedManual(UAVObject* obj);
|
||||
void objectUpdatedPeriodic(UAVObject* obj);
|
||||
void objectUnpacked(UAVObject* obj);
|
||||
void updateRequested(UAVObject* obj);
|
||||
void newObject(UAVObject* obj);
|
||||
void newInstance(UAVObject* obj);
|
||||
void processPeriodicUpdates();
|
||||
void transactionCompleted(UAVObject* obj, bool success);
|
||||
void transactionTimeout();
|
||||
|
||||
private:
|
||||
// Constants
|
||||
static const int REQ_TIMEOUT_MS = 250;
|
||||
@ -105,14 +112,6 @@ private:
|
||||
bool allInstances;
|
||||
} ObjectQueueInfo;
|
||||
|
||||
typedef struct {
|
||||
UAVObject* obj;
|
||||
bool allInstances;
|
||||
bool objRequest;
|
||||
qint32 retriesRemaining;
|
||||
bool acked;
|
||||
} ObjectTransactionInfo;
|
||||
|
||||
// Variables
|
||||
UAVObjectManager* objMngr;
|
||||
UAVTalk* utalk;
|
||||
@ -120,11 +119,9 @@ private:
|
||||
QList<ObjectTimeInfo> objList;
|
||||
QQueue<ObjectQueueInfo> objQueue;
|
||||
QQueue<ObjectQueueInfo> objPriorityQueue;
|
||||
ObjectTransactionInfo transInfo;
|
||||
bool transPending;
|
||||
QMap<quint32, ObjectTransactionInfo*>transMap;
|
||||
QMutex* mutex;
|
||||
QTimer* updateTimer;
|
||||
QTimer* transTimer;
|
||||
QTimer* statsTimer;
|
||||
qint32 timeToNextUpdateMs;
|
||||
quint32 txErrors;
|
||||
@ -137,9 +134,19 @@ private:
|
||||
void connectToObjectInstances(UAVObject* obj, quint32 eventMask);
|
||||
void updateObject(UAVObject* obj, quint32 eventMask);
|
||||
void processObjectUpdates(UAVObject* obj, EventMask event, bool allInstances, bool priority);
|
||||
void processObjectTransaction();
|
||||
void processObjectTransaction(ObjectTransactionInfo *transInfo);
|
||||
void processObjectQueue();
|
||||
|
||||
private slots:
|
||||
void objectUpdatedAuto(UAVObject* obj);
|
||||
void objectUpdatedManual(UAVObject* obj);
|
||||
void objectUpdatedPeriodic(UAVObject* obj);
|
||||
void objectUnpacked(UAVObject* obj);
|
||||
void updateRequested(UAVObject* obj);
|
||||
void newObject(UAVObject* obj);
|
||||
void newInstance(UAVObject* obj);
|
||||
void processPeriodicUpdates();
|
||||
void transactionCompleted(UAVObject* obj, bool success);
|
||||
|
||||
};
|
||||
|
||||
|
@ -72,8 +72,6 @@ UAVTalk::UAVTalk(QIODevice* iodev, UAVObjectManager* objMngr)
|
||||
|
||||
mutex = new QMutex(QMutex::Recursive);
|
||||
|
||||
respObj = NULL;
|
||||
|
||||
memset(&stats, 0, sizeof(ComStats));
|
||||
|
||||
connect(io, SIGNAL(readyRead()), this, SLOT(processInputStream()));
|
||||
@ -155,10 +153,16 @@ bool UAVTalk::sendObject(UAVObject* obj, bool acked, bool allInstances)
|
||||
/**
|
||||
* Cancel a pending transaction
|
||||
*/
|
||||
void UAVTalk::cancelTransaction()
|
||||
void UAVTalk::cancelTransaction(UAVObject* obj)
|
||||
{
|
||||
QMutexLocker locker(mutex);
|
||||
respObj = NULL;
|
||||
quint32 objId = obj->getObjID();
|
||||
QMap<quint32, Transaction*>::iterator itr = transMap.find(objId);
|
||||
if ( itr != transMap.end() )
|
||||
{
|
||||
transMap.remove(objId);
|
||||
delete itr.value();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -178,8 +182,10 @@ bool UAVTalk::objectTransaction(UAVObject* obj, quint8 type, bool allInstances)
|
||||
{
|
||||
if ( transmitObject(obj, type, allInstances) )
|
||||
{
|
||||
respObj = obj;
|
||||
respAllInstances = allInstances;
|
||||
Transaction *trans = new Transaction();
|
||||
trans->obj = obj;
|
||||
trans->allInstances = allInstances;
|
||||
transMap.insert(obj->getObjID(), trans);
|
||||
return true;
|
||||
}
|
||||
else
|
||||
@ -625,9 +631,12 @@ UAVObject* UAVTalk::updateObject(quint32 objId, quint16 instId, quint8* data)
|
||||
*/
|
||||
void UAVTalk::updateNack(UAVObject* obj)
|
||||
{
|
||||
if (respObj != NULL && respObj->getObjID() == obj->getObjID() && (respObj->getInstID() == obj->getInstID() || respAllInstances))
|
||||
quint32 objId = obj->getObjID();
|
||||
QMap<quint32, Transaction*>::iterator itr = transMap.find(objId);
|
||||
if ( itr != transMap.end() && (itr.value()->obj->getInstID() == obj->getInstID() || itr.value()->allInstances))
|
||||
{
|
||||
respObj = NULL;
|
||||
transMap.remove(objId);
|
||||
delete itr.value();
|
||||
emit transactionCompleted(obj, false);
|
||||
}
|
||||
}
|
||||
@ -638,9 +647,12 @@ void UAVTalk::updateNack(UAVObject* obj)
|
||||
*/
|
||||
void UAVTalk::updateAck(UAVObject* obj)
|
||||
{
|
||||
if (respObj != NULL && respObj->getObjID() == obj->getObjID() && (respObj->getInstID() == obj->getInstID() || respAllInstances))
|
||||
quint32 objId = obj->getObjID();
|
||||
QMap<quint32, Transaction*>::iterator itr = transMap.find(objId);
|
||||
if ( itr != transMap.end() && (itr.value()->obj->getInstID() == obj->getInstID() || itr.value()->allInstances))
|
||||
{
|
||||
respObj = NULL;
|
||||
transMap.remove(objId);
|
||||
delete itr.value();
|
||||
emit transactionCompleted(obj, true);
|
||||
}
|
||||
}
|
||||
|
@ -30,6 +30,7 @@
|
||||
#include <QIODevice>
|
||||
#include <QMutex>
|
||||
#include <QMutexLocker>
|
||||
#include <QMap>
|
||||
#include <QSemaphore>
|
||||
#include "uavobjectmanager.h"
|
||||
#include "uavtalk_global.h"
|
||||
@ -54,7 +55,7 @@ public:
|
||||
~UAVTalk();
|
||||
bool sendObject(UAVObject* obj, bool acked, bool allInstances);
|
||||
bool sendObjectRequest(UAVObject* obj, bool allInstances);
|
||||
void cancelTransaction();
|
||||
void cancelTransaction(UAVObject* obj);
|
||||
ComStats getStats();
|
||||
void resetStats();
|
||||
|
||||
@ -65,6 +66,12 @@ private slots:
|
||||
void processInputStream(void);
|
||||
|
||||
private:
|
||||
|
||||
typedef struct {
|
||||
UAVObject* obj;
|
||||
bool allInstances;
|
||||
} Transaction;
|
||||
|
||||
// Constants
|
||||
static const int TYPE_MASK = 0xF8;
|
||||
static const int TYPE_VER = 0x20;
|
||||
@ -96,8 +103,7 @@ private:
|
||||
QIODevice* io;
|
||||
UAVObjectManager* objMngr;
|
||||
QMutex* mutex;
|
||||
UAVObject* respObj;
|
||||
bool respAllInstances;
|
||||
QMap<quint32, Transaction*> transMap;
|
||||
quint8 rxBuffer[MAX_PACKET_LENGTH];
|
||||
quint8 txBuffer[MAX_PACKET_LENGTH];
|
||||
// Variables used by the receive state machine
|
||||
|
Loading…
x
Reference in New Issue
Block a user