diff --git a/ground/src/plugins/uavtalk/telemetry.cpp b/ground/src/plugins/uavtalk/telemetry.cpp index 27a1bde7e..fab487505 100644 --- a/ground/src/plugins/uavtalk/telemetry.cpp +++ b/ground/src/plugins/uavtalk/telemetry.cpp @@ -261,25 +261,60 @@ void Telemetry::processObjectTransaction() } else { - UAVObject::Metadata metadata = transInfo.obj->getMetadata(); - utalk->sendObject(transInfo.obj, metadata.gcsTelemetryAcked, transInfo.allInstances); + utalk->sendObject(transInfo.obj, transInfo.acked, transInfo.allInstances); + } + // Start timer if a response is expected + if ( transInfo.objRequest || transInfo.acked ) + { + transTimer->start(REQ_TIMEOUT_MS); + } + else + { + transTimer->stop(); + transPending = false; } - // Start timer - transTimer->start(REQ_TIMEOUT_MS); } } /** * Process the event received from an object */ -void Telemetry::processObjectUpdates(UAVObject* obj, EventMask event, bool allInstances) +void Telemetry::processObjectUpdates(UAVObject* obj, EventMask event, bool allInstances, bool priority) { + // Check if queue is full + if ( objQueue.length() > MAX_QUEUE_SIZE ) + { + ++txErrors; + return; + } + // Push event into queue ObjectQueueInfo objInfo; objInfo.obj = obj; objInfo.event = event; objInfo.allInstances = allInstances; - objQueue.enqueue(objInfo); + if (priority) + { + if ( objPriorityQueue.length() < MAX_QUEUE_SIZE ) + { + objPriorityQueue.enqueue(objInfo); + } + else + { + ++txErrors; + } + } + else + { + if ( objQueue.length() < MAX_QUEUE_SIZE ) + { + objQueue.enqueue(objInfo); + } + else + { + ++txErrors; + } + } // If there is no transaction in progress then process event if (!transPending) @@ -293,30 +328,41 @@ void Telemetry::processObjectUpdates(UAVObject* obj, EventMask event, bool allIn */ void Telemetry::processObjectQueue() { - // If the queue is empty there is nothing to do - if (objQueue.isEmpty()) + // Get object information from queue (first the priority and then the regular queue) + ObjectQueueInfo objInfo; + if ( !objPriorityQueue.isEmpty() ) + { + objInfo = objPriorityQueue.dequeue(); + } + else if ( !objQueue.isEmpty() ) + { + objInfo = objQueue.dequeue(); + } + else { return; } - // Get updated object from the queue - ObjectQueueInfo objInfo = objQueue.dequeue(); - - // Setup transaction - transInfo.obj = objInfo.obj; - transInfo.allInstances = objInfo.allInstances; - transInfo.retriesRemaining = MAX_RETRIES; - if ( objInfo.event == EV_UPDATED || objInfo.event == EV_UPDATED_MANUAL ) + // Setup transaction (skip if unpack event) + if ( objInfo.event != EV_UNPACKED ) { - transInfo.objRequest = false; + UAVObject::Metadata metadata = objInfo.obj->getMetadata(); + transInfo.obj = objInfo.obj; + transInfo.allInstances = objInfo.allInstances; + transInfo.retriesRemaining = MAX_RETRIES; + transInfo.acked = metadata.gcsTelemetryAcked; + if ( objInfo.event == EV_UPDATED || objInfo.event == EV_UPDATED_MANUAL ) + { + transInfo.objRequest = false; + } + else if ( objInfo.event == EV_UPDATE_REQ ) + { + transInfo.objRequest = true; + } + // Start transaction + transPending = true; + processObjectTransaction(); } - else if ( objInfo.event == EV_UPDATE_REQ ) - { - transInfo.objRequest = true; - } - // Start transaction - transPending = true; - processObjectTransaction(); // If this is a metaobject then make necessary telemetry updates UAVMetaObject* metaobj = dynamic_cast(objInfo.obj); @@ -357,7 +403,7 @@ void Telemetry::processPeriodicUpdates() objinfo.timeToNextUpdateMs = objinfo.updatePeriodMs; // Send object time.start(); - processObjectUpdates(objinfo.obj, EV_UPDATED_MANUAL, true); + processObjectUpdates(objinfo.obj, EV_UPDATED_MANUAL, true, false); elapsedMs = time.elapsed(); // Update timeToNextUpdateMs with the elapsed delay of sending the object; timeToNextUpdateMs += elapsedMs; @@ -414,25 +460,25 @@ void Telemetry::processStatsUpdates() void Telemetry::objectUpdatedAuto(UAVObject* obj) { QMutexLocker locker(mutex); - processObjectUpdates(obj, EV_UPDATED, false); + processObjectUpdates(obj, EV_UPDATED, false, true); } void Telemetry::objectUpdatedManual(UAVObject* obj) { QMutexLocker locker(mutex); - processObjectUpdates(obj, EV_UPDATED_MANUAL, false); + processObjectUpdates(obj, EV_UPDATED_MANUAL, false, true); } void Telemetry::objectUnpacked(UAVObject* obj) { QMutexLocker locker(mutex); - processObjectUpdates(obj, EV_UNPACKED, false); + processObjectUpdates(obj, EV_UNPACKED, false, true); } void Telemetry::updateRequested(UAVObject* obj) { QMutexLocker locker(mutex); - processObjectUpdates(obj, EV_UPDATE_REQ, false); + processObjectUpdates(obj, EV_UPDATE_REQ, false, true); } void Telemetry::newObject(UAVObject* obj) diff --git a/ground/src/plugins/uavtalk/telemetry.h b/ground/src/plugins/uavtalk/telemetry.h index 8f0328fd1..1e1e1376f 100644 --- a/ground/src/plugins/uavtalk/telemetry.h +++ b/ground/src/plugins/uavtalk/telemetry.h @@ -65,6 +65,7 @@ private: static const int MAX_UPDATE_PERIOD_MS = 1000; static const int MIN_UPDATE_PERIOD_MS = 1; static const int STATS_UPDATE_PERIOD_MS = 5000; + static const int MAX_QUEUE_SIZE = 20; // Types /** @@ -94,6 +95,7 @@ private: bool allInstances; bool objRequest; qint32 retriesRemaining; + bool acked; } ObjectTransactionInfo; // Variables @@ -101,6 +103,7 @@ private: UAVTalk* utalk; QList objList; QQueue objQueue; + QQueue objPriorityQueue; ObjectTransactionInfo transInfo; bool transPending; QMutex* mutex; @@ -118,7 +121,7 @@ private: void setUpdatePeriod(UAVObject* obj, qint32 periodMs); void connectToObjectInstances(UAVObject* obj, quint32 eventMask); void updateObject(UAVObject* obj); - void processObjectUpdates(UAVObject* obj, EventMask event, bool allInstances); + void processObjectUpdates(UAVObject* obj, EventMask event, bool allInstances, bool priority); void processObjectTransaction(); void processObjectQueue(); diff --git a/ground/src/plugins/uavtalk/uavtalk.cpp b/ground/src/plugins/uavtalk/uavtalk.cpp index 3f39a1a8b..ac160c920 100644 --- a/ground/src/plugins/uavtalk/uavtalk.cpp +++ b/ground/src/plugins/uavtalk/uavtalk.cpp @@ -453,7 +453,7 @@ bool UAVTalk::transmitObject(UAVObject* obj, quint8 type, bool allInstances) // If all instances are requested on a single instance object it is an error if (allInstances && obj->isSingleInstance()) { - return false; + allInstances = false; } // Process message type