1
0
mirror of https://bitbucket.org/librepilot/librepilot.git synced 2025-01-17 02:52:12 +01:00

Most of the work on Telemetry.java as well as lots of signals for various

object events
This commit is contained in:
James Cotton 2011-03-10 02:05:36 -06:00
parent 3d7f4e2273
commit cb5e690de0
5 changed files with 673 additions and 13 deletions

View File

@ -1,8 +1,18 @@
package org.openpilot.uavtalk;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Observable;
import java.util.Observer;
import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;
import org.openpilot.uavtalk.UAVObject.Acked;
public class Telemetry {
private TelemetryStats stats;
public class TelemetryStats {
public int txBytes;
public int rxBytes;
@ -15,11 +25,612 @@ public class Telemetry {
public int txRetries;
} ;
public TelemetryStats getStats() {
return stats;
}
class ObjectTimeInfo {
UAVObject obj;
int updatePeriodMs; /** Update period in ms or 0 if no periodic updates are needed */
int timeToNextUpdateMs; /** Time delay to the next update */
};
class ObjectQueueInfo {
UAVObject obj;
int event;
boolean allInstances;
};
class ObjectTransactionInfo {
UAVObject obj;
boolean allInstances;
boolean objRequest;
int retriesRemaining;
Acked acked;
} ;
public void resetStats() {
stats = new TelemetryStats();
/**
* Events generated by objects. Not enum because used in mask.
*/
private static final int EV_UNPACKED = 0x01; /** Object data updated by unpacking */
private static final int EV_UPDATED = 0x02; /** Object data updated by changing the data structure */
private static final int EV_UPDATED_MANUAL = 0x04; /** Object update event manually generated */
private static final int EV_UPDATE_REQ = 0x08; /** Request to update object data */
/**
* Constructor
*/
public Telemetry(UAVTalk utalk, UAVObjectManager objMngr)
{
this.utalk = utalk;
this.objMngr = objMngr;
// Process all objects in the list
List< List<UAVObject> > objs = objMngr.getObjects();
ListIterator<List<UAVObject>> li = objs.listIterator();
while(li.hasNext())
registerObject(li.next().get(0)); // we only need to register one instance per object type
// Listen to new object creations
objMngr.addNewInstanceObserver(new Observer() {
public void update(Observable observable, Object data) {
newInstance((UAVObject) data);
}
});
objMngr.addNewObjectObserver(new Observer() {
public void update(Observable observable, Object data) {
newObject((UAVObject) data);
}
});
// Listen to transaction completions
utalk.addObserver(new Observer() {
public void update(Observable observable, Object data) {
transactionCompleted((UAVObject) data);
}
});
// Get GCS stats object
gcsStatsObj = objMngr.getObject("GCSTelemetryStats");
// Setup transaction timer
transPending = false;
transTimer = new Timer();
transTimerTask = new TimerTask() {
@Override
public void run() {
transactionTimeout();
}
};
// Setup and start the periodic timer
timeToNextUpdateMs = 0;
updateTimer = new Timer();
updateTimerTask = new TimerTask() {
@Override
public void run() {
processPeriodicUpdates();
}
};
updateTimer.scheduleAtFixedRate(updateTimerTask, 1000, 1000);
// Setup and start the stats timer
txErrors = 0;
txRetries = 0;
}
/**
* Register a new object for periodic updates (if enabled)
*/
private void registerObject(UAVObject obj)
{
// Setup object for periodic updates
addObject(obj);
// Setup object for telemetry updates
updateObject(obj);
}
/**
* Add an object in the list used for periodic updates
*/
private void addObject(UAVObject obj)
{
// Check if object type is already in the list
ListIterator<ObjectTimeInfo> li = objList.listIterator();
while(li.hasNext()) {
ObjectTimeInfo n = li.next();
if( n.obj.getObjID() == obj.getObjID() )
{
// Object type (not instance!) is already in the list, do nothing
return;
}
}
// If this point is reached, then the object type is new, let's add it
ObjectTimeInfo timeInfo = new ObjectTimeInfo();
timeInfo.obj = obj;
timeInfo.timeToNextUpdateMs = 0;
timeInfo.updatePeriodMs = 0;
objList.add(timeInfo);
}
/**
* Update the object's timers
*/
private void setUpdatePeriod(UAVObject obj, int periodMs)
{
// Find object type (not instance!) and update its period
ListIterator<ObjectTimeInfo> li = objList.listIterator();
while(li.hasNext()) {
ObjectTimeInfo n = li.next();
if ( n.obj.getObjID() == obj.getObjID() )
{
n.updatePeriodMs = periodMs;
n.timeToNextUpdateMs = (int) (periodMs * (new java.util.Random()).nextDouble()); // avoid bunching of updates
}
}
}
/**
* Connect to all instances of an object depending on the event mask specified
*/
private void connectToObjectInstances(UAVObject obj, int eventMask)
{
List<UAVObject> objs = objMngr.getObjectInstances(obj.getObjID());
ListIterator<UAVObject> li = objs.listIterator();
while(li.hasNext())
{
obj = li.next();
//TODO: Disconnect all
// obj.disconnect(this);
// Connect only the selected events
if ( (eventMask&EV_UNPACKED) != 0)
{
obj.addUnpackedObserver(new Observer() {
public void update(Observable observable, Object data) {
objectUnpacked( (UAVObject) data);
}
});
}
if ( (eventMask&EV_UPDATED) != 0)
{
obj.addUpdatedAutoObserver(new Observer() {
public void update(Observable observable, Object data) {
objectUpdatedAuto( (UAVObject) data);
}
});
}
if ( (eventMask&EV_UPDATED_MANUAL) != 0)
{
obj.addUpdatedManualObserver(new Observer() {
public void update(Observable observable, Object data) {
objectUpdatedManual( (UAVObject) data);
}
});
}
if ( (eventMask&EV_UPDATE_REQ) != 0)
{
obj.addUpdatedObserver(new Observer() {
public void update(Observable observable, Object data) {
updateRequested( (UAVObject) data);
}
});
}
}
}
/**
* Update an object based on its metadata properties
*/
private void updateObject(UAVObject obj)
{
// Get metadata
UAVObject.Metadata metadata = obj.getMetadata();
// Setup object depending on update mode
int eventMask;
if ( metadata.gcsTelemetryUpdateMode == UAVObject.UpdateMode.UPDATEMODE_PERIODIC )
{
// Set update period
setUpdatePeriod(obj, metadata.gcsTelemetryUpdatePeriod);
// Connect signals for all instances
eventMask = EV_UPDATED_MANUAL | EV_UPDATE_REQ;
if(obj.isMetadata())
eventMask |= EV_UNPACKED; // we also need to act on remote updates (unpack events)
connectToObjectInstances(obj, eventMask);
}
else if ( metadata.gcsTelemetryUpdateMode == UAVObject.UpdateMode.UPDATEMODE_ONCHANGE )
{
// Set update period
setUpdatePeriod(obj, 0);
// Connect signals for all instances
eventMask = EV_UPDATED | EV_UPDATED_MANUAL | EV_UPDATE_REQ;
if(obj.isMetadata())
eventMask |= EV_UNPACKED; // we also need to act on remote updates (unpack events)
connectToObjectInstances(obj, eventMask);
}
else if ( metadata.gcsTelemetryUpdateMode == UAVObject.UpdateMode.UPDATEMODE_MANUAL )
{
// Set update period
setUpdatePeriod(obj, 0);
// Connect signals for all instances
eventMask = EV_UPDATED_MANUAL | EV_UPDATE_REQ;
if(obj.isMetadata())
eventMask |= EV_UNPACKED; // we also need to act on remote updates (unpack events)
connectToObjectInstances(obj, eventMask);
}
else if ( metadata.gcsTelemetryUpdateMode == UAVObject.UpdateMode.UPDATEMODE_NEVER )
{
// Set update period
setUpdatePeriod(obj, 0);
// Disconnect from object
connectToObjectInstances(obj, 0);
}
}
/**
* Called when a transaction is successfully completed (uavtalk event)
*/
private void transactionCompleted(UAVObject obj)
{
// Check if there is a pending transaction and the objects match
if ( transPending && transInfo.obj.getObjID() == obj.getObjID() )
{
// qDebug() << QString("Telemetry: transaction completed for %1").arg(obj->getName());
// Complete transaction
transTimer.cancel();
transPending = false;
// Send signal
obj.transactionCompleted(true);
// Process new object updates from queue
processObjectQueue();
} else
{
// qDebug() << "Error: received a transaction completed when did not expect it.";
}
}
/**
* Called when a transaction is not completed within the timeout period (timer event)
*/
private void transactionTimeout()
{
// qDebug() << "Telemetry: transaction timeout.";
transTimer.cancel();
// Proceed only if there is a pending transaction
if ( transPending )
{
// Check if more retries are pending
if (transInfo.retriesRemaining > 0)
{
--transInfo.retriesRemaining;
processObjectTransaction();
++txRetries;
}
else
{
// Terminate transaction
utalk.cancelTransaction();
transPending = false;
// Send signal
transInfo.obj.transactionCompleted(false);
// Process new object updates from queue
processObjectQueue();
++txErrors;
}
}
}
/**
* Start an object transaction with UAVTalk, all information is stored in transInfo
*/
private void processObjectTransaction()
{
if (transPending)
{
// qDebug() << tr("Process Object transaction for %1").arg(transInfo.obj->getName());
// Initiate transaction
if (transInfo.objRequest)
{
utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances);
}
else
{
utalk.sendObject(transInfo.obj, transInfo.acked == Acked.TRUE, transInfo.allInstances);
}
// Start timer if a response is expected
if ( transInfo.objRequest || transInfo.acked == Acked.TRUE )
{
transTimer.scheduleAtFixedRate(transTimerTask, REQ_TIMEOUT_MS, REQ_TIMEOUT_MS);
}
else
{
transTimer.cancel();
transPending = false;
}
} else
{
// qDebug() << "Error: inside of processObjectTransaction with no transPending";
}
}
/**
* Process the event received from an object
*/
private void processObjectUpdates(UAVObject obj, int event, boolean allInstances, boolean priority)
{
// Push event into queue
// qDebug() << "Push event into queue for obj " << QString("%1 event %2").arg(obj->getName()).arg(event);
ObjectQueueInfo objInfo = new ObjectQueueInfo();
objInfo.obj = obj;
objInfo.event = event;
objInfo.allInstances = allInstances;
if (priority)
{
if ( objPriorityQueue.size() < MAX_QUEUE_SIZE )
{
objPriorityQueue.add(objInfo);
}
else
{
++txErrors;
obj.transactionCompleted(false);
//qxtLog->warning(tr("Telemetry: priority event queue is full, event lost (%1)").arg(obj->getName()));
}
}
else
{
if ( objQueue.size() < MAX_QUEUE_SIZE )
{
objQueue.add(objInfo);
}
else
{
++txErrors;
obj.transactionCompleted(false);
}
}
// If there is no transaction in progress then process event
if (!transPending)
{
// qDebug() << "No transaction pending, process object queue...";
processObjectQueue();
} else
{
// qDebug() << "Transaction pending, DO NOT process object queue...";
}
}
/**
* Process events from the object queue
*/
private void processObjectQueue()
{
// qDebug() << "Process object queue " << tr("- Depth (%1 %2)").arg(objQueue.length()).arg(objPriorityQueue.length());
// Don nothing if a transaction is already in progress (should not happen)
if (transPending)
{
// qxtLog->error("Telemetry: Dequeue while a transaction pending!");
return;
}
// Get object information from queue (first the priority and then the regular queue)
ObjectQueueInfo objInfo;
if ( !objPriorityQueue.isEmpty() )
{
objInfo = objPriorityQueue.remove();
}
else if ( !objQueue.isEmpty() )
{
objInfo = objQueue.remove();
}
else
{
return;
}
// Check if a connection has been established, only process GCSTelemetryStats updates
// (used to establish the connection)
if ( ((String) gcsStatsObj.getField("Status").getValue()).compareTo("Connected") != 0 )
{
objQueue.clear();
if ( objInfo.obj.getObjID() != objMngr.getObject("GCSTelemetryStats").getObjID() )
{
objInfo.obj.transactionCompleted(false);
return;
}
}
// Setup transaction (skip if unpack event)
if ( objInfo.event != EV_UNPACKED )
{
UAVObject.Metadata metadata = objInfo.obj.getMetadata();
transInfo.obj = objInfo.obj;
transInfo.allInstances = objInfo.allInstances;
transInfo.retriesRemaining = MAX_RETRIES;
transInfo.acked = metadata.gcsTelemetryAcked;
if ( objInfo.event == EV_UPDATED || objInfo.event == EV_UPDATED_MANUAL )
{
transInfo.objRequest = false;
}
else if ( objInfo.event == EV_UPDATE_REQ )
{
transInfo.objRequest = true;
}
// Start transaction
transPending = true;
processObjectTransaction();
} else
{
// qDebug() << QString("Process object queue: this is an unpack event for %1").arg(objInfo.obj->getName());
}
// If this is a metaobject then make necessary telemetry updates
if (objInfo.obj.isMetadata())
{
UAVMetaObject metaobj = (UAVMetaObject) objInfo.obj;
updateObject( metaobj.getParentObject() );
}
// The fact we received an unpacked event does not mean that
// we do not have additional objects still in the queue,
// so we have to reschedule queue processing to make sure they are not
// stuck:
if ( objInfo.event == EV_UNPACKED )
processObjectQueue();
}
/**
* Check is any objects are pending for periodic updates
* TODO: Clean-up
*/
private synchronized void processPeriodicUpdates()
{
// Stop timer
updateTimer.cancel();
// Iterate through each object and update its timer, if zero then transmit object.
// Also calculate smallest delay to next update (will be used for setting timeToNextUpdateMs)
int minDelay = MAX_UPDATE_PERIOD_MS;
ObjectTimeInfo objinfo;
int elapsedMs = 0;
long startTime;
int offset;
ListIterator<ObjectTimeInfo> li = objList.listIterator();
while(li.hasNext())
{
objinfo = li.next();
// If object is configured for periodic updates
if (objinfo.updatePeriodMs > 0)
{
objinfo.timeToNextUpdateMs -= timeToNextUpdateMs;
// Check if time for the next update
if (objinfo.timeToNextUpdateMs <= 0)
{
// Reset timer
offset = (-objinfo.timeToNextUpdateMs) % objinfo.updatePeriodMs;
objinfo.timeToNextUpdateMs = objinfo.updatePeriodMs - offset;
// Send object
startTime = System.currentTimeMillis();
processObjectUpdates(objinfo.obj, EV_UPDATED_MANUAL, true, false);
elapsedMs = (int) (System.currentTimeMillis() - startTime);
// Update timeToNextUpdateMs with the elapsed delay of sending the object;
timeToNextUpdateMs += elapsedMs;
}
// Update minimum delay
if (objinfo.timeToNextUpdateMs < minDelay)
{
minDelay = objinfo.timeToNextUpdateMs;
}
}
}
// Check if delay for the next update is too short
if (minDelay < MIN_UPDATE_PERIOD_MS)
{
minDelay = MIN_UPDATE_PERIOD_MS;
}
// Done
timeToNextUpdateMs = minDelay;
// Restart timer
//updateTimer->start(timeToNextUpdateMs);
updateTimer.scheduleAtFixedRate(updateTimerTask, timeToNextUpdateMs, timeToNextUpdateMs);
}
public TelemetryStats getStats()
{
// Get UAVTalk stats
UAVTalk.ComStats utalkStats = utalk.getStats();
// Update stats
TelemetryStats stats = new TelemetryStats();
stats.txBytes = utalkStats.txBytes;
stats.rxBytes = utalkStats.rxBytes;
stats.txObjectBytes = utalkStats.txObjectBytes;
stats.rxObjectBytes = utalkStats.rxObjectBytes;
stats.rxObjects = utalkStats.rxObjects;
stats.txObjects = utalkStats.txObjects;
stats.txErrors = utalkStats.txErrors + txErrors;
stats.rxErrors = utalkStats.rxErrors;
stats.txRetries = txRetries;
// Done
return stats;
}
public synchronized void resetStats()
{
utalk.resetStats();
txErrors = 0;
txRetries = 0;
}
private synchronized void objectUpdatedAuto(UAVObject obj)
{
processObjectUpdates(obj, EV_UPDATED, false, true);
}
private synchronized void objectUpdatedManual(UAVObject obj)
{
processObjectUpdates(obj, EV_UPDATED_MANUAL, false, true);
}
private synchronized void objectUnpacked(UAVObject obj)
{
processObjectUpdates(obj, EV_UNPACKED, false, true);
}
private synchronized void updateRequested(UAVObject obj)
{
processObjectUpdates(obj, EV_UPDATE_REQ, false, true);
}
private void newObject(UAVObject obj)
{
registerObject(obj);
}
private synchronized void newInstance(UAVObject obj)
{
registerObject(obj);
}
/**
* Private variables
*/
private TelemetryStats stats;
private UAVObjectManager objMngr;
private UAVTalk utalk;
private UAVObject gcsStatsObj;
private List<ObjectTimeInfo> objList;
private Queue<ObjectQueueInfo> objQueue = new LinkedList<ObjectQueueInfo>();
private Queue<ObjectQueueInfo> objPriorityQueue = new LinkedList<ObjectQueueInfo>();
private ObjectTransactionInfo transInfo;
private boolean transPending;
private Timer updateTimer;
private TimerTask updateTimerTask;
private Timer transTimer;
private TimerTask transTimerTask;
private int timeToNextUpdateMs;
private int txErrors;
private int txRetries;
/**
* Private constants
*/
private static final int REQ_TIMEOUT_MS = 250;
private static final int MAX_RETRIES = 2;
private static final int MAX_UPDATE_PERIOD_MS = 1000;
private static final int MIN_UPDATE_PERIOD_MS = 1;
private static final int MAX_QUEUE_SIZE = 20;
}

View File

@ -123,7 +123,7 @@ public class TelemetryMonitor {
// Connect to object
//connect(obj, SIGNAL(transactionCompleted(UAVObject*,bool)), this, SLOT(transactionCompleted(UAVObject*,bool)));
// Request update
obj.requestUpdate();
tel.requestUpdate(obj);
objPending = obj;
}

View File

@ -8,7 +8,7 @@ import java.util.Observer;
import java.util.Observable;
public abstract class UAVObject {
public class CallbackListener extends Observable {
private UAVObject parent;
@ -20,6 +20,31 @@ public abstract class UAVObject {
setChanged();
notifyObservers(parent);
}
public void event (Object data) {
setChanged();
notifyObservers(data);
}
}
public class TransactionResult {
public UAVObject obj;
public boolean success;
public TransactionResult(UAVObject obj, boolean success) {
this.obj = obj;
this.success = success;
}
}
private CallbackListener transactionCompletedListeners = new CallbackListener(this);
public void addTransactionCompleted(Observer o) {
synchronized(transactionCompletedListeners) {
transactionCompletedListeners.addObserver(o);
}
}
void transactionCompleted(boolean status) {
synchronized(transactionCompletedListeners) {
transactionCompletedListeners.event(new TransactionResult(this,status));
}
}
private CallbackListener updatedListeners = new CallbackListener(this);
@ -47,6 +72,30 @@ public abstract class UAVObject {
}
}
private CallbackListener updatedAutoListeners = new CallbackListener(this);
public void addUpdatedAutoObserver(Observer o) {
synchronized(updatedAutoListeners) {
updatedAutoListeners.addObserver(o);
}
}
void updatedAuto() {
synchronized(updatedAutoListeners) {
updatedAutoListeners.event();
}
}
private CallbackListener updatedManualListeners = new CallbackListener(this);
public void addUpdatedManualObserver(Observer o) {
synchronized(updatedManualListeners) {
updatedManualListeners.addObserver(o);
}
}
void updatedManual() {
synchronized(updatedManualListeners) {
updatedManualListeners.event();
}
}
public abstract boolean isMetadata();
/**

View File

@ -102,7 +102,6 @@ public class UAVObjectManager {
newInstance.event(newObj);
}
obj.initialize(mobj);
//emit new instance signal
instList.add(obj);
newInstance.event(obj);
@ -155,7 +154,7 @@ public class UAVObjectManager {
List<UAVObject> ls = new ArrayList<UAVObject>();
ls.add(obj);
objects.add(ls);
//emit newObject(obj);
newObject.event(obj);
}
/**

View File

@ -7,8 +7,9 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Observable;
public class UAVTalk {
public class UAVTalk extends Observable{
private Thread inputProcessingThread = null;
/**
@ -632,8 +633,8 @@ public class UAVTalk {
if (respObj != null && respObj.getObjID() == obj.getObjID() && (respObj.getInstID() == obj.getInstID() || respAllInstances))
{
respObj = null;
// TODO: Signals
// emit transactionCompleted(obj);
setChanged();
notifyObservers(obj);
}
}