mirror of https://bitbucket.org/librepilot/librepilot.git synced 2025-03-15 07:29:15 +01:00

Most of the work on Telemetry.java as well as lots of signals for various

object events
This commit is contained in:
James Cotton 2011-03-10 02:05:36 -06:00
parent 63f750c51e
commit 6bc97f1a3d
5 changed files with 673 additions and 13 deletions

View File

@ -1,8 +1,18 @@
package org.openpilot.uavtalk;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Observable;
import java.util.Observer;
import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;
import org.openpilot.uavtalk.UAVObject.Acked;
public class Telemetry {
private TelemetryStats stats;
public class TelemetryStats {
public int txBytes;
public int rxBytes;
@ -15,11 +25,612 @@ public class Telemetry {
public int txRetries;
} ;
public TelemetryStats getStats() {
return stats;
class ObjectTimeInfo {
UAVObject obj;
int updatePeriodMs; /** Update period in ms or 0 if no periodic updates are needed */
int timeToNextUpdateMs; /** Time delay to the next update */
class ObjectQueueInfo {
UAVObject obj;
int event;
boolean allInstances;
class ObjectTransactionInfo {
UAVObject obj;
boolean allInstances;
boolean objRequest;
int retriesRemaining;
Acked acked;
} ;
public void resetStats() {
stats = new TelemetryStats();
* Events generated by objects. Not enum because used in mask.
private static final int EV_UNPACKED = 0x01; /** Object data updated by unpacking */
private static final int EV_UPDATED = 0x02; /** Object data updated by changing the data structure */
private static final int EV_UPDATED_MANUAL = 0x04; /** Object update event manually generated */
private static final int EV_UPDATE_REQ = 0x08; /** Request to update object data */
* Constructor
public Telemetry(UAVTalk utalk, UAVObjectManager objMngr)
this.utalk = utalk;
this.objMngr = objMngr;
// Process all objects in the list
List< List<UAVObject> > objs = objMngr.getObjects();
ListIterator<List<UAVObject>> li = objs.listIterator();
registerObject(li.next().get(0)); // we only need to register one instance per object type
// Listen to new object creations
objMngr.addNewInstanceObserver(new Observer() {
public void update(Observable observable, Object data) {
newInstance((UAVObject) data);
objMngr.addNewObjectObserver(new Observer() {
public void update(Observable observable, Object data) {
newObject((UAVObject) data);
// Listen to transaction completions
utalk.addObserver(new Observer() {
public void update(Observable observable, Object data) {
transactionCompleted((UAVObject) data);
// Get GCS stats object
gcsStatsObj = objMngr.getObject("GCSTelemetryStats");
// Setup transaction timer
transPending = false;
transTimer = new Timer();
transTimerTask = new TimerTask() {
public void run() {
// Setup and start the periodic timer
timeToNextUpdateMs = 0;
updateTimer = new Timer();
updateTimerTask = new TimerTask() {
public void run() {
updateTimer.scheduleAtFixedRate(updateTimerTask, 1000, 1000);
// Setup and start the stats timer
txErrors = 0;
txRetries = 0;
* Register a new object for periodic updates (if enabled)
private void registerObject(UAVObject obj)
// Setup object for periodic updates
// Setup object for telemetry updates
* Add an object in the list used for periodic updates
private void addObject(UAVObject obj)
// Check if object type is already in the list
ListIterator<ObjectTimeInfo> li = objList.listIterator();
while(li.hasNext()) {
ObjectTimeInfo n = li.next();
if( n.obj.getObjID() == obj.getObjID() )
// Object type (not instance!) is already in the list, do nothing
// If this point is reached, then the object type is new, let's add it
ObjectTimeInfo timeInfo = new ObjectTimeInfo();
timeInfo.obj = obj;
timeInfo.timeToNextUpdateMs = 0;
timeInfo.updatePeriodMs = 0;
* Update the object's timers
private void setUpdatePeriod(UAVObject obj, int periodMs)
// Find object type (not instance!) and update its period
ListIterator<ObjectTimeInfo> li = objList.listIterator();
while(li.hasNext()) {
ObjectTimeInfo n = li.next();
if ( n.obj.getObjID() == obj.getObjID() )
n.updatePeriodMs = periodMs;
n.timeToNextUpdateMs = (int) (periodMs * (new java.util.Random()).nextDouble()); // avoid bunching of updates
* Connect to all instances of an object depending on the event mask specified
private void connectToObjectInstances(UAVObject obj, int eventMask)
List<UAVObject> objs = objMngr.getObjectInstances(obj.getObjID());
ListIterator<UAVObject> li = objs.listIterator();
obj = li.next();
//TODO: Disconnect all
// obj.disconnect(this);
// Connect only the selected events
if ( (eventMask&EV_UNPACKED) != 0)
obj.addUnpackedObserver(new Observer() {
public void update(Observable observable, Object data) {
objectUnpacked( (UAVObject) data);
if ( (eventMask&EV_UPDATED) != 0)
obj.addUpdatedAutoObserver(new Observer() {
public void update(Observable observable, Object data) {
objectUpdatedAuto( (UAVObject) data);
if ( (eventMask&EV_UPDATED_MANUAL) != 0)
obj.addUpdatedManualObserver(new Observer() {
public void update(Observable observable, Object data) {
objectUpdatedManual( (UAVObject) data);
if ( (eventMask&EV_UPDATE_REQ) != 0)
obj.addUpdatedObserver(new Observer() {
public void update(Observable observable, Object data) {
updateRequested( (UAVObject) data);
* Update an object based on its metadata properties
private void updateObject(UAVObject obj)
// Get metadata
UAVObject.Metadata metadata = obj.getMetadata();
// Setup object depending on update mode
int eventMask;
if ( metadata.gcsTelemetryUpdateMode == UAVObject.UpdateMode.UPDATEMODE_PERIODIC )
// Set update period
setUpdatePeriod(obj, metadata.gcsTelemetryUpdatePeriod);
// Connect signals for all instances
eventMask |= EV_UNPACKED; // we also need to act on remote updates (unpack events)
connectToObjectInstances(obj, eventMask);
else if ( metadata.gcsTelemetryUpdateMode == UAVObject.UpdateMode.UPDATEMODE_ONCHANGE )
// Set update period
setUpdatePeriod(obj, 0);
// Connect signals for all instances
eventMask |= EV_UNPACKED; // we also need to act on remote updates (unpack events)
connectToObjectInstances(obj, eventMask);
else if ( metadata.gcsTelemetryUpdateMode == UAVObject.UpdateMode.UPDATEMODE_MANUAL )
// Set update period
setUpdatePeriod(obj, 0);
// Connect signals for all instances
eventMask |= EV_UNPACKED; // we also need to act on remote updates (unpack events)
connectToObjectInstances(obj, eventMask);
else if ( metadata.gcsTelemetryUpdateMode == UAVObject.UpdateMode.UPDATEMODE_NEVER )
// Set update period
setUpdatePeriod(obj, 0);
// Disconnect from object
connectToObjectInstances(obj, 0);
* Called when a transaction is successfully completed (uavtalk event)
private void transactionCompleted(UAVObject obj)
// Check if there is a pending transaction and the objects match
if ( transPending && transInfo.obj.getObjID() == obj.getObjID() )
// qDebug() << QString("Telemetry: transaction completed for %1").arg(obj->getName());
// Complete transaction
transPending = false;
// Send signal
// Process new object updates from queue
} else
// qDebug() << "Error: received a transaction completed when did not expect it.";
* Called when a transaction is not completed within the timeout period (timer event)
private void transactionTimeout()
// qDebug() << "Telemetry: transaction timeout.";
// Proceed only if there is a pending transaction
if ( transPending )
// Check if more retries are pending
if (transInfo.retriesRemaining > 0)
// Terminate transaction
transPending = false;
// Send signal
// Process new object updates from queue
* Start an object transaction with UAVTalk, all information is stored in transInfo
private void processObjectTransaction()
if (transPending)
// qDebug() << tr("Process Object transaction for %1").arg(transInfo.obj->getName());
// Initiate transaction
if (transInfo.objRequest)
utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances);
utalk.sendObject(transInfo.obj, transInfo.acked == Acked.TRUE, transInfo.allInstances);
// Start timer if a response is expected
if ( transInfo.objRequest || transInfo.acked == Acked.TRUE )
transTimer.scheduleAtFixedRate(transTimerTask, REQ_TIMEOUT_MS, REQ_TIMEOUT_MS);
transPending = false;
} else
// qDebug() << "Error: inside of processObjectTransaction with no transPending";
* Process the event received from an object
private void processObjectUpdates(UAVObject obj, int event, boolean allInstances, boolean priority)
// Push event into queue
// qDebug() << "Push event into queue for obj " << QString("%1 event %2").arg(obj->getName()).arg(event);
ObjectQueueInfo objInfo = new ObjectQueueInfo();
objInfo.obj = obj;
objInfo.event = event;
objInfo.allInstances = allInstances;
if (priority)
if ( objPriorityQueue.size() < MAX_QUEUE_SIZE )
//qxtLog->warning(tr("Telemetry: priority event queue is full, event lost (%1)").arg(obj->getName()));
if ( objQueue.size() < MAX_QUEUE_SIZE )
// If there is no transaction in progress then process event
if (!transPending)
// qDebug() << "No transaction pending, process object queue...";
} else
// qDebug() << "Transaction pending, DO NOT process object queue...";
* Process events from the object queue
private void processObjectQueue()
// qDebug() << "Process object queue " << tr("- Depth (%1 %2)").arg(objQueue.length()).arg(objPriorityQueue.length());
// Don nothing if a transaction is already in progress (should not happen)
if (transPending)
// qxtLog->error("Telemetry: Dequeue while a transaction pending!");
// Get object information from queue (first the priority and then the regular queue)
ObjectQueueInfo objInfo;
if ( !objPriorityQueue.isEmpty() )
objInfo = objPriorityQueue.remove();
else if ( !objQueue.isEmpty() )
objInfo = objQueue.remove();
// Check if a connection has been established, only process GCSTelemetryStats updates
// (used to establish the connection)
if ( ((String) gcsStatsObj.getField("Status").getValue()).compareTo("Connected") != 0 )
if ( objInfo.obj.getObjID() != objMngr.getObject("GCSTelemetryStats").getObjID() )
// Setup transaction (skip if unpack event)
if ( objInfo.event != EV_UNPACKED )
UAVObject.Metadata metadata = objInfo.obj.getMetadata();
transInfo.obj = objInfo.obj;
transInfo.allInstances = objInfo.allInstances;
transInfo.retriesRemaining = MAX_RETRIES;
transInfo.acked = metadata.gcsTelemetryAcked;
if ( objInfo.event == EV_UPDATED || objInfo.event == EV_UPDATED_MANUAL )
transInfo.objRequest = false;
else if ( objInfo.event == EV_UPDATE_REQ )
transInfo.objRequest = true;
// Start transaction
transPending = true;
} else
// qDebug() << QString("Process object queue: this is an unpack event for %1").arg(objInfo.obj->getName());
// If this is a metaobject then make necessary telemetry updates
if (objInfo.obj.isMetadata())
UAVMetaObject metaobj = (UAVMetaObject) objInfo.obj;
updateObject( metaobj.getParentObject() );
// The fact we received an unpacked event does not mean that
// we do not have additional objects still in the queue,
// so we have to reschedule queue processing to make sure they are not
// stuck:
if ( objInfo.event == EV_UNPACKED )
* Check is any objects are pending for periodic updates
* TODO: Clean-up
private synchronized void processPeriodicUpdates()
// Stop timer
// Iterate through each object and update its timer, if zero then transmit object.
// Also calculate smallest delay to next update (will be used for setting timeToNextUpdateMs)
int minDelay = MAX_UPDATE_PERIOD_MS;
ObjectTimeInfo objinfo;
int elapsedMs = 0;
long startTime;
int offset;
ListIterator<ObjectTimeInfo> li = objList.listIterator();
objinfo = li.next();
// If object is configured for periodic updates
if (objinfo.updatePeriodMs > 0)
objinfo.timeToNextUpdateMs -= timeToNextUpdateMs;
// Check if time for the next update
if (objinfo.timeToNextUpdateMs <= 0)
// Reset timer
offset = (-objinfo.timeToNextUpdateMs) % objinfo.updatePeriodMs;
objinfo.timeToNextUpdateMs = objinfo.updatePeriodMs - offset;
// Send object
startTime = System.currentTimeMillis();
processObjectUpdates(objinfo.obj, EV_UPDATED_MANUAL, true, false);
elapsedMs = (int) (System.currentTimeMillis() - startTime);
// Update timeToNextUpdateMs with the elapsed delay of sending the object;
timeToNextUpdateMs += elapsedMs;
// Update minimum delay
if (objinfo.timeToNextUpdateMs < minDelay)
minDelay = objinfo.timeToNextUpdateMs;
// Check if delay for the next update is too short
if (minDelay < MIN_UPDATE_PERIOD_MS)
// Done
timeToNextUpdateMs = minDelay;
// Restart timer
updateTimer.scheduleAtFixedRate(updateTimerTask, timeToNextUpdateMs, timeToNextUpdateMs);
public TelemetryStats getStats()
// Get UAVTalk stats
UAVTalk.ComStats utalkStats = utalk.getStats();
// Update stats
TelemetryStats stats = new TelemetryStats();
stats.txBytes = utalkStats.txBytes;
stats.rxBytes = utalkStats.rxBytes;
stats.txObjectBytes = utalkStats.txObjectBytes;
stats.rxObjectBytes = utalkStats.rxObjectBytes;
stats.rxObjects = utalkStats.rxObjects;
stats.txObjects = utalkStats.txObjects;
stats.txErrors = utalkStats.txErrors + txErrors;
stats.rxErrors = utalkStats.rxErrors;
stats.txRetries = txRetries;
// Done
return stats;
public synchronized void resetStats()
txErrors = 0;
txRetries = 0;
private synchronized void objectUpdatedAuto(UAVObject obj)
processObjectUpdates(obj, EV_UPDATED, false, true);
private synchronized void objectUpdatedManual(UAVObject obj)
processObjectUpdates(obj, EV_UPDATED_MANUAL, false, true);
private synchronized void objectUnpacked(UAVObject obj)
processObjectUpdates(obj, EV_UNPACKED, false, true);
private synchronized void updateRequested(UAVObject obj)
processObjectUpdates(obj, EV_UPDATE_REQ, false, true);
private void newObject(UAVObject obj)
private synchronized void newInstance(UAVObject obj)
* Private variables
private TelemetryStats stats;
private UAVObjectManager objMngr;
private UAVTalk utalk;
private UAVObject gcsStatsObj;
private List<ObjectTimeInfo> objList;
private Queue<ObjectQueueInfo> objQueue = new LinkedList<ObjectQueueInfo>();
private Queue<ObjectQueueInfo> objPriorityQueue = new LinkedList<ObjectQueueInfo>();
private ObjectTransactionInfo transInfo;
private boolean transPending;
private Timer updateTimer;
private TimerTask updateTimerTask;
private Timer transTimer;
private TimerTask transTimerTask;
private int timeToNextUpdateMs;
private int txErrors;
private int txRetries;
* Private constants
private static final int REQ_TIMEOUT_MS = 250;
private static final int MAX_RETRIES = 2;
private static final int MAX_UPDATE_PERIOD_MS = 1000;
private static final int MIN_UPDATE_PERIOD_MS = 1;
private static final int MAX_QUEUE_SIZE = 20;

View File

@ -123,7 +123,7 @@ public class TelemetryMonitor {
// Connect to object
//connect(obj, SIGNAL(transactionCompleted(UAVObject*,bool)), this, SLOT(transactionCompleted(UAVObject*,bool)));
// Request update
objPending = obj;

View File

@ -8,7 +8,7 @@ import java.util.Observer;
import java.util.Observable;
public abstract class UAVObject {
public class CallbackListener extends Observable {
private UAVObject parent;
@ -20,6 +20,31 @@ public abstract class UAVObject {
public void event (Object data) {
public class TransactionResult {
public UAVObject obj;
public boolean success;
public TransactionResult(UAVObject obj, boolean success) {
this.obj = obj;
this.success = success;
private CallbackListener transactionCompletedListeners = new CallbackListener(this);
public void addTransactionCompleted(Observer o) {
synchronized(transactionCompletedListeners) {
void transactionCompleted(boolean status) {
synchronized(transactionCompletedListeners) {
transactionCompletedListeners.event(new TransactionResult(this,status));
private CallbackListener updatedListeners = new CallbackListener(this);
@ -47,6 +72,30 @@ public abstract class UAVObject {
private CallbackListener updatedAutoListeners = new CallbackListener(this);
public void addUpdatedAutoObserver(Observer o) {
synchronized(updatedAutoListeners) {
void updatedAuto() {
synchronized(updatedAutoListeners) {
private CallbackListener updatedManualListeners = new CallbackListener(this);
public void addUpdatedManualObserver(Observer o) {
synchronized(updatedManualListeners) {
void updatedManual() {
synchronized(updatedManualListeners) {
public abstract boolean isMetadata();

View File

@ -102,7 +102,6 @@ public class UAVObjectManager {
//emit new instance signal
@ -155,7 +154,7 @@ public class UAVObjectManager {
List<UAVObject> ls = new ArrayList<UAVObject>();
//emit newObject(obj);

View File

@ -7,8 +7,9 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Observable;
public class UAVTalk {
public class UAVTalk extends Observable{
private Thread inputProcessingThread = null;
@ -632,8 +633,8 @@ public class UAVTalk {
if (respObj != null && respObj.getObjID() == obj.getObjID() && (respObj.getInstID() == obj.getInstID() || respAllInstances))
respObj = null;
// TODO: Signals
// emit transactionCompleted(obj);