From cb5e690de058f077c997432446d299a0d012cd4b Mon Sep 17 00:00:00 2001 From: James Cotton Date: Thu, 10 Mar 2011 02:05:36 -0600 Subject: [PATCH] Most of the work on Telemetry.java as well as lots of signals for various object events --- .../src/org/openpilot/uavtalk/Telemetry.java | 623 +++++++++++++++++- .../openpilot/uavtalk/TelemetryMonitor.java | 2 +- .../src/org/openpilot/uavtalk/UAVObject.java | 51 +- .../openpilot/uavtalk/UAVObjectManager.java | 3 +- .../src/org/openpilot/uavtalk/UAVTalk.java | 7 +- 5 files changed, 673 insertions(+), 13 deletions(-) diff --git a/androidgcs/src/org/openpilot/uavtalk/Telemetry.java b/androidgcs/src/org/openpilot/uavtalk/Telemetry.java index 88bc33de5..79e821fe3 100644 --- a/androidgcs/src/org/openpilot/uavtalk/Telemetry.java +++ b/androidgcs/src/org/openpilot/uavtalk/Telemetry.java @@ -1,8 +1,18 @@ package org.openpilot.uavtalk; +import java.util.LinkedList; +import java.util.List; +import java.util.ListIterator; +import java.util.Observable; +import java.util.Observer; +import java.util.Queue; +import java.util.Timer; +import java.util.TimerTask; + +import org.openpilot.uavtalk.UAVObject.Acked; + public class Telemetry { - private TelemetryStats stats; public class TelemetryStats { public int txBytes; public int rxBytes; @@ -15,11 +25,612 @@ public class Telemetry { public int txRetries; } ; - public TelemetryStats getStats() { - return stats; - } + class ObjectTimeInfo { + UAVObject obj; + int updatePeriodMs; /** Update period in ms or 0 if no periodic updates are needed */ + int timeToNextUpdateMs; /** Time delay to the next update */ + }; + + class ObjectQueueInfo { + UAVObject obj; + int event; + boolean allInstances; + }; + + class ObjectTransactionInfo { + UAVObject obj; + boolean allInstances; + boolean objRequest; + int retriesRemaining; + Acked acked; + } ; - public void resetStats() { - stats = new TelemetryStats(); + /** + * Events generated by objects. Not enum because used in mask. + */ + private static final int EV_UNPACKED = 0x01; /** Object data updated by unpacking */ + private static final int EV_UPDATED = 0x02; /** Object data updated by changing the data structure */ + private static final int EV_UPDATED_MANUAL = 0x04; /** Object update event manually generated */ + private static final int EV_UPDATE_REQ = 0x08; /** Request to update object data */ + + /** + * Constructor + */ + public Telemetry(UAVTalk utalk, UAVObjectManager objMngr) + { + this.utalk = utalk; + this.objMngr = objMngr; + + // Process all objects in the list + List< List > objs = objMngr.getObjects(); + ListIterator> li = objs.listIterator(); + while(li.hasNext()) + registerObject(li.next().get(0)); // we only need to register one instance per object type + + // Listen to new object creations + objMngr.addNewInstanceObserver(new Observer() { + public void update(Observable observable, Object data) { + newInstance((UAVObject) data); + } + }); + objMngr.addNewObjectObserver(new Observer() { + public void update(Observable observable, Object data) { + newObject((UAVObject) data); + } + }); + + // Listen to transaction completions + utalk.addObserver(new Observer() { + public void update(Observable observable, Object data) { + transactionCompleted((UAVObject) data); + } + }); + + // Get GCS stats object + gcsStatsObj = objMngr.getObject("GCSTelemetryStats"); + + // Setup transaction timer + transPending = false; + transTimer = new Timer(); + transTimerTask = new TimerTask() { + @Override + public void run() { + transactionTimeout(); + } + }; + // Setup and start the periodic timer + timeToNextUpdateMs = 0; + + updateTimer = new Timer(); + updateTimerTask = new TimerTask() { + @Override + public void run() { + processPeriodicUpdates(); + } + }; + updateTimer.scheduleAtFixedRate(updateTimerTask, 1000, 1000); + // Setup and start the stats timer + txErrors = 0; + txRetries = 0; } + + /** + * Register a new object for periodic updates (if enabled) + */ + private void registerObject(UAVObject obj) + { + // Setup object for periodic updates + addObject(obj); + + // Setup object for telemetry updates + updateObject(obj); + } + + /** + * Add an object in the list used for periodic updates + */ + private void addObject(UAVObject obj) + { + // Check if object type is already in the list + ListIterator li = objList.listIterator(); + while(li.hasNext()) { + ObjectTimeInfo n = li.next(); + if( n.obj.getObjID() == obj.getObjID() ) + { + // Object type (not instance!) is already in the list, do nothing + return; + } + } + + // If this point is reached, then the object type is new, let's add it + ObjectTimeInfo timeInfo = new ObjectTimeInfo(); + timeInfo.obj = obj; + timeInfo.timeToNextUpdateMs = 0; + timeInfo.updatePeriodMs = 0; + objList.add(timeInfo); + } + + /** + * Update the object's timers + */ + private void setUpdatePeriod(UAVObject obj, int periodMs) + { + // Find object type (not instance!) and update its period + ListIterator li = objList.listIterator(); + while(li.hasNext()) { + ObjectTimeInfo n = li.next(); + if ( n.obj.getObjID() == obj.getObjID() ) + { + n.updatePeriodMs = periodMs; + n.timeToNextUpdateMs = (int) (periodMs * (new java.util.Random()).nextDouble()); // avoid bunching of updates + } + } + } + + /** + * Connect to all instances of an object depending on the event mask specified + */ + private void connectToObjectInstances(UAVObject obj, int eventMask) + { + List objs = objMngr.getObjectInstances(obj.getObjID()); + ListIterator li = objs.listIterator(); + while(li.hasNext()) + { + obj = li.next(); + //TODO: Disconnect all + // obj.disconnect(this); + + // Connect only the selected events + if ( (eventMask&EV_UNPACKED) != 0) + { + obj.addUnpackedObserver(new Observer() { + public void update(Observable observable, Object data) { + objectUnpacked( (UAVObject) data); + } + }); + } + if ( (eventMask&EV_UPDATED) != 0) + { + obj.addUpdatedAutoObserver(new Observer() { + public void update(Observable observable, Object data) { + objectUpdatedAuto( (UAVObject) data); + } + }); + } + if ( (eventMask&EV_UPDATED_MANUAL) != 0) + { + obj.addUpdatedManualObserver(new Observer() { + public void update(Observable observable, Object data) { + objectUpdatedManual( (UAVObject) data); + } + }); + } + if ( (eventMask&EV_UPDATE_REQ) != 0) + { + obj.addUpdatedObserver(new Observer() { + public void update(Observable observable, Object data) { + updateRequested( (UAVObject) data); + } + }); + } + } + } + + /** + * Update an object based on its metadata properties + */ + private void updateObject(UAVObject obj) + { + // Get metadata + UAVObject.Metadata metadata = obj.getMetadata(); + + // Setup object depending on update mode + int eventMask; + if ( metadata.gcsTelemetryUpdateMode == UAVObject.UpdateMode.UPDATEMODE_PERIODIC ) + { + // Set update period + setUpdatePeriod(obj, metadata.gcsTelemetryUpdatePeriod); + // Connect signals for all instances + eventMask = EV_UPDATED_MANUAL | EV_UPDATE_REQ; + if(obj.isMetadata()) + eventMask |= EV_UNPACKED; // we also need to act on remote updates (unpack events) + + connectToObjectInstances(obj, eventMask); + } + else if ( metadata.gcsTelemetryUpdateMode == UAVObject.UpdateMode.UPDATEMODE_ONCHANGE ) + { + // Set update period + setUpdatePeriod(obj, 0); + // Connect signals for all instances + eventMask = EV_UPDATED | EV_UPDATED_MANUAL | EV_UPDATE_REQ; + if(obj.isMetadata()) + eventMask |= EV_UNPACKED; // we also need to act on remote updates (unpack events) + + connectToObjectInstances(obj, eventMask); + } + else if ( metadata.gcsTelemetryUpdateMode == UAVObject.UpdateMode.UPDATEMODE_MANUAL ) + { + // Set update period + setUpdatePeriod(obj, 0); + // Connect signals for all instances + eventMask = EV_UPDATED_MANUAL | EV_UPDATE_REQ; + if(obj.isMetadata()) + eventMask |= EV_UNPACKED; // we also need to act on remote updates (unpack events) + + connectToObjectInstances(obj, eventMask); + } + else if ( metadata.gcsTelemetryUpdateMode == UAVObject.UpdateMode.UPDATEMODE_NEVER ) + { + // Set update period + setUpdatePeriod(obj, 0); + // Disconnect from object + connectToObjectInstances(obj, 0); + } + } + + /** + * Called when a transaction is successfully completed (uavtalk event) + */ + private void transactionCompleted(UAVObject obj) + { + // Check if there is a pending transaction and the objects match + if ( transPending && transInfo.obj.getObjID() == obj.getObjID() ) + { + // qDebug() << QString("Telemetry: transaction completed for %1").arg(obj->getName()); + // Complete transaction + transTimer.cancel(); + transPending = false; + // Send signal + obj.transactionCompleted(true); + // Process new object updates from queue + processObjectQueue(); + } else + { + // qDebug() << "Error: received a transaction completed when did not expect it."; + } + } + + /** + * Called when a transaction is not completed within the timeout period (timer event) + */ + private void transactionTimeout() + { +// qDebug() << "Telemetry: transaction timeout."; + transTimer.cancel(); + // Proceed only if there is a pending transaction + if ( transPending ) + { + // Check if more retries are pending + if (transInfo.retriesRemaining > 0) + { + --transInfo.retriesRemaining; + processObjectTransaction(); + ++txRetries; + } + else + { + // Terminate transaction + utalk.cancelTransaction(); + transPending = false; + // Send signal + transInfo.obj.transactionCompleted(false); + // Process new object updates from queue + processObjectQueue(); + ++txErrors; + } + } + } + + /** + * Start an object transaction with UAVTalk, all information is stored in transInfo + */ + private void processObjectTransaction() + { + if (transPending) + { + // qDebug() << tr("Process Object transaction for %1").arg(transInfo.obj->getName()); + // Initiate transaction + if (transInfo.objRequest) + { + utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances); + } + else + { + utalk.sendObject(transInfo.obj, transInfo.acked == Acked.TRUE, transInfo.allInstances); + } + // Start timer if a response is expected + if ( transInfo.objRequest || transInfo.acked == Acked.TRUE ) + { + transTimer.scheduleAtFixedRate(transTimerTask, REQ_TIMEOUT_MS, REQ_TIMEOUT_MS); + } + else + { + transTimer.cancel(); + transPending = false; + } + } else + { + // qDebug() << "Error: inside of processObjectTransaction with no transPending"; + } + } + + /** + * Process the event received from an object + */ + private void processObjectUpdates(UAVObject obj, int event, boolean allInstances, boolean priority) + { + // Push event into queue +// qDebug() << "Push event into queue for obj " << QString("%1 event %2").arg(obj->getName()).arg(event); + ObjectQueueInfo objInfo = new ObjectQueueInfo(); + objInfo.obj = obj; + objInfo.event = event; + objInfo.allInstances = allInstances; + if (priority) + { + if ( objPriorityQueue.size() < MAX_QUEUE_SIZE ) + { + objPriorityQueue.add(objInfo); + } + else + { + ++txErrors; + obj.transactionCompleted(false); + //qxtLog->warning(tr("Telemetry: priority event queue is full, event lost (%1)").arg(obj->getName())); + } + } + else + { + if ( objQueue.size() < MAX_QUEUE_SIZE ) + { + objQueue.add(objInfo); + } + else + { + ++txErrors; + obj.transactionCompleted(false); + } + } + + // If there is no transaction in progress then process event + if (!transPending) + { + // qDebug() << "No transaction pending, process object queue..."; + processObjectQueue(); + } else + { + // qDebug() << "Transaction pending, DO NOT process object queue..."; + } + } + + /** + * Process events from the object queue + */ + private void processObjectQueue() + { + // qDebug() << "Process object queue " << tr("- Depth (%1 %2)").arg(objQueue.length()).arg(objPriorityQueue.length()); + + // Don nothing if a transaction is already in progress (should not happen) + if (transPending) + { +// qxtLog->error("Telemetry: Dequeue while a transaction pending!"); + return; + } + + // Get object information from queue (first the priority and then the regular queue) + ObjectQueueInfo objInfo; + if ( !objPriorityQueue.isEmpty() ) + { + objInfo = objPriorityQueue.remove(); + } + else if ( !objQueue.isEmpty() ) + { + objInfo = objQueue.remove(); + } + else + { + return; + } + + // Check if a connection has been established, only process GCSTelemetryStats updates + // (used to establish the connection) + if ( ((String) gcsStatsObj.getField("Status").getValue()).compareTo("Connected") != 0 ) + { + objQueue.clear(); + if ( objInfo.obj.getObjID() != objMngr.getObject("GCSTelemetryStats").getObjID() ) + { + objInfo.obj.transactionCompleted(false); + return; + } + } + + // Setup transaction (skip if unpack event) + if ( objInfo.event != EV_UNPACKED ) + { + UAVObject.Metadata metadata = objInfo.obj.getMetadata(); + transInfo.obj = objInfo.obj; + transInfo.allInstances = objInfo.allInstances; + transInfo.retriesRemaining = MAX_RETRIES; + transInfo.acked = metadata.gcsTelemetryAcked; + if ( objInfo.event == EV_UPDATED || objInfo.event == EV_UPDATED_MANUAL ) + { + transInfo.objRequest = false; + } + else if ( objInfo.event == EV_UPDATE_REQ ) + { + transInfo.objRequest = true; + } + // Start transaction + transPending = true; + processObjectTransaction(); + } else + { +// qDebug() << QString("Process object queue: this is an unpack event for %1").arg(objInfo.obj->getName()); + } + + // If this is a metaobject then make necessary telemetry updates + if (objInfo.obj.isMetadata()) + { + UAVMetaObject metaobj = (UAVMetaObject) objInfo.obj; + updateObject( metaobj.getParentObject() ); + } + + // The fact we received an unpacked event does not mean that + // we do not have additional objects still in the queue, + // so we have to reschedule queue processing to make sure they are not + // stuck: + if ( objInfo.event == EV_UNPACKED ) + processObjectQueue(); + + } + + /** + * Check is any objects are pending for periodic updates + * TODO: Clean-up + */ + private synchronized void processPeriodicUpdates() + { + // Stop timer + updateTimer.cancel(); + + // Iterate through each object and update its timer, if zero then transmit object. + // Also calculate smallest delay to next update (will be used for setting timeToNextUpdateMs) + int minDelay = MAX_UPDATE_PERIOD_MS; + ObjectTimeInfo objinfo; + int elapsedMs = 0; + long startTime; + int offset; + ListIterator li = objList.listIterator(); + while(li.hasNext()) + { + objinfo = li.next(); + // If object is configured for periodic updates + if (objinfo.updatePeriodMs > 0) + { + objinfo.timeToNextUpdateMs -= timeToNextUpdateMs; + // Check if time for the next update + if (objinfo.timeToNextUpdateMs <= 0) + { + // Reset timer + offset = (-objinfo.timeToNextUpdateMs) % objinfo.updatePeriodMs; + objinfo.timeToNextUpdateMs = objinfo.updatePeriodMs - offset; + // Send object + startTime = System.currentTimeMillis(); + processObjectUpdates(objinfo.obj, EV_UPDATED_MANUAL, true, false); + elapsedMs = (int) (System.currentTimeMillis() - startTime); + // Update timeToNextUpdateMs with the elapsed delay of sending the object; + timeToNextUpdateMs += elapsedMs; + } + // Update minimum delay + if (objinfo.timeToNextUpdateMs < minDelay) + { + minDelay = objinfo.timeToNextUpdateMs; + } + } + } + + // Check if delay for the next update is too short + if (minDelay < MIN_UPDATE_PERIOD_MS) + { + minDelay = MIN_UPDATE_PERIOD_MS; + } + + // Done + timeToNextUpdateMs = minDelay; + + // Restart timer + //updateTimer->start(timeToNextUpdateMs); + updateTimer.scheduleAtFixedRate(updateTimerTask, timeToNextUpdateMs, timeToNextUpdateMs); + } + + public TelemetryStats getStats() + { + // Get UAVTalk stats + UAVTalk.ComStats utalkStats = utalk.getStats(); + + // Update stats + TelemetryStats stats = new TelemetryStats(); + stats.txBytes = utalkStats.txBytes; + stats.rxBytes = utalkStats.rxBytes; + stats.txObjectBytes = utalkStats.txObjectBytes; + stats.rxObjectBytes = utalkStats.rxObjectBytes; + stats.rxObjects = utalkStats.rxObjects; + stats.txObjects = utalkStats.txObjects; + stats.txErrors = utalkStats.txErrors + txErrors; + stats.rxErrors = utalkStats.rxErrors; + stats.txRetries = txRetries; + + // Done + return stats; + } + + public synchronized void resetStats() + { + utalk.resetStats(); + txErrors = 0; + txRetries = 0; + } + + private synchronized void objectUpdatedAuto(UAVObject obj) + { + processObjectUpdates(obj, EV_UPDATED, false, true); + } + + private synchronized void objectUpdatedManual(UAVObject obj) + { + processObjectUpdates(obj, EV_UPDATED_MANUAL, false, true); + } + + private synchronized void objectUnpacked(UAVObject obj) + { + processObjectUpdates(obj, EV_UNPACKED, false, true); + } + + private synchronized void updateRequested(UAVObject obj) + { + processObjectUpdates(obj, EV_UPDATE_REQ, false, true); + } + + private void newObject(UAVObject obj) + { + registerObject(obj); + } + + private synchronized void newInstance(UAVObject obj) + { + registerObject(obj); + } + + /** + * Private variables + */ + private TelemetryStats stats; + private UAVObjectManager objMngr; + private UAVTalk utalk; + private UAVObject gcsStatsObj; + private List objList; + private Queue objQueue = new LinkedList(); + private Queue objPriorityQueue = new LinkedList(); + private ObjectTransactionInfo transInfo; + private boolean transPending; + + private Timer updateTimer; + private TimerTask updateTimerTask; + private Timer transTimer; + private TimerTask transTimerTask; + + private int timeToNextUpdateMs; + private int txErrors; + private int txRetries; + + /** + * Private constants + */ + private static final int REQ_TIMEOUT_MS = 250; + private static final int MAX_RETRIES = 2; + private static final int MAX_UPDATE_PERIOD_MS = 1000; + private static final int MIN_UPDATE_PERIOD_MS = 1; + private static final int MAX_QUEUE_SIZE = 20; + + + } diff --git a/androidgcs/src/org/openpilot/uavtalk/TelemetryMonitor.java b/androidgcs/src/org/openpilot/uavtalk/TelemetryMonitor.java index 40fc220cc..39d332457 100644 --- a/androidgcs/src/org/openpilot/uavtalk/TelemetryMonitor.java +++ b/androidgcs/src/org/openpilot/uavtalk/TelemetryMonitor.java @@ -123,7 +123,7 @@ public class TelemetryMonitor { // Connect to object //connect(obj, SIGNAL(transactionCompleted(UAVObject*,bool)), this, SLOT(transactionCompleted(UAVObject*,bool))); // Request update - obj.requestUpdate(); + tel.requestUpdate(obj); objPending = obj; } diff --git a/androidgcs/src/org/openpilot/uavtalk/UAVObject.java b/androidgcs/src/org/openpilot/uavtalk/UAVObject.java index adfece03e..9e4f69a39 100644 --- a/androidgcs/src/org/openpilot/uavtalk/UAVObject.java +++ b/androidgcs/src/org/openpilot/uavtalk/UAVObject.java @@ -8,7 +8,7 @@ import java.util.Observer; import java.util.Observable; public abstract class UAVObject { - + public class CallbackListener extends Observable { private UAVObject parent; @@ -20,6 +20,31 @@ public abstract class UAVObject { setChanged(); notifyObservers(parent); } + public void event (Object data) { + setChanged(); + notifyObservers(data); + } + } + + public class TransactionResult { + public UAVObject obj; + public boolean success; + public TransactionResult(UAVObject obj, boolean success) { + this.obj = obj; + this.success = success; + } + } + + private CallbackListener transactionCompletedListeners = new CallbackListener(this); + public void addTransactionCompleted(Observer o) { + synchronized(transactionCompletedListeners) { + transactionCompletedListeners.addObserver(o); + } + } + void transactionCompleted(boolean status) { + synchronized(transactionCompletedListeners) { + transactionCompletedListeners.event(new TransactionResult(this,status)); + } } private CallbackListener updatedListeners = new CallbackListener(this); @@ -47,6 +72,30 @@ public abstract class UAVObject { } } + private CallbackListener updatedAutoListeners = new CallbackListener(this); + public void addUpdatedAutoObserver(Observer o) { + synchronized(updatedAutoListeners) { + updatedAutoListeners.addObserver(o); + } + } + void updatedAuto() { + synchronized(updatedAutoListeners) { + updatedAutoListeners.event(); + } + } + + private CallbackListener updatedManualListeners = new CallbackListener(this); + public void addUpdatedManualObserver(Observer o) { + synchronized(updatedManualListeners) { + updatedManualListeners.addObserver(o); + } + } + void updatedManual() { + synchronized(updatedManualListeners) { + updatedManualListeners.event(); + } + } + public abstract boolean isMetadata(); /** diff --git a/androidgcs/src/org/openpilot/uavtalk/UAVObjectManager.java b/androidgcs/src/org/openpilot/uavtalk/UAVObjectManager.java index 63f535eac..3ec588879 100644 --- a/androidgcs/src/org/openpilot/uavtalk/UAVObjectManager.java +++ b/androidgcs/src/org/openpilot/uavtalk/UAVObjectManager.java @@ -102,7 +102,6 @@ public class UAVObjectManager { newInstance.event(newObj); } obj.initialize(mobj); - //emit new instance signal instList.add(obj); newInstance.event(obj); @@ -155,7 +154,7 @@ public class UAVObjectManager { List ls = new ArrayList(); ls.add(obj); objects.add(ls); - //emit newObject(obj); + newObject.event(obj); } /** diff --git a/androidgcs/src/org/openpilot/uavtalk/UAVTalk.java b/androidgcs/src/org/openpilot/uavtalk/UAVTalk.java index 7ea9c7c60..847b7c425 100644 --- a/androidgcs/src/org/openpilot/uavtalk/UAVTalk.java +++ b/androidgcs/src/org/openpilot/uavtalk/UAVTalk.java @@ -7,8 +7,9 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.util.Observable; -public class UAVTalk { +public class UAVTalk extends Observable{ private Thread inputProcessingThread = null; /** @@ -632,8 +633,8 @@ public class UAVTalk { if (respObj != null && respObj.getObjID() == obj.getObjID() && (respObj.getInstID() == obj.getInstID() || respAllInstances)) { respObj = null; - // TODO: Signals -// emit transactionCompleted(obj); + setChanged(); + notifyObservers(obj); } }