1
0
mirror of https://bitbucket.org/librepilot/librepilot.git synced 2025-02-21 11:54:15 +01:00

AndroidGCS: Handler based telemetry. Now reschedule transactions if one is

pending.
This commit is contained in:
James Cotton 2012-08-15 00:01:14 -05:00
parent 653702ac23
commit 1bbfb35446

View File

@ -30,10 +30,8 @@ import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
import java.util.Observable; import java.util.Observable;
import java.util.Observer; import java.util.Observer;
import java.util.Queue;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
import junit.framework.Assert; import junit.framework.Assert;
import android.os.Handler; import android.os.Handler;
@ -49,8 +47,8 @@ public class Telemetry {
private final String TAG = "Telemetry"; private final String TAG = "Telemetry";
public static int LOGLEVEL = 0; public static int LOGLEVEL = 0;
public static boolean WARN = LOGLEVEL > 2; public static boolean DEBUG = LOGLEVEL > 2;
public static boolean DEBUG = LOGLEVEL > 1; public static boolean WARN = LOGLEVEL > 1;
public static boolean ERROR = LOGLEVEL > 0; public static boolean ERROR = LOGLEVEL > 0;
public class TelemetryStats { public class TelemetryStats {
@ -391,8 +389,8 @@ public class Telemetry {
*/ */
private void transactionCompleted(UAVObject obj, boolean result) private void transactionCompleted(UAVObject obj, boolean result)
throws IOException { throws IOException {
if (DEBUG) if (DEBUG) Log.d(TAG, "UAVTalk transactionCompleted");
Log.d(TAG, "UAVTalk transactionCompleted");
// Check if there is a pending transaction and the objects match // Check if there is a pending transaction and the objects match
if (transPending && transInfo.obj.getObjID() == obj.getObjID()) { if (transPending && transInfo.obj.getObjID() == obj.getObjID()) {
if (DEBUG) Log.d(TAG, "Telemetry: transaction completed for " + obj.getName()); if (DEBUG) Log.d(TAG, "Telemetry: transaction completed for " + obj.getName());
@ -406,9 +404,7 @@ public class Telemetry {
//Send signal //Send signal
obj.transactionCompleted(result); obj.transactionCompleted(result);
} else { } else {
if (ERROR) if (ERROR) Log.e(TAG, "Error: received a transaction completed when did not expect it.");
Log.e(TAG,
"Error: received a transaction completed when did not expect it.");
transPending = false; transPending = false;
} }
} }
@ -433,16 +429,11 @@ public class Telemetry {
++txRetries; ++txRetries;
} else { } else {
if (ERROR) if (ERROR)
Log.e(TAG, Log.e(TAG, "Transaction failed for: " + transInfo.obj.getName());
"Transaction failed for: "
+ transInfo.obj.getName());
// Terminate transaction. This triggers UAVTalk to send a // Terminate transaction. This triggers UAVTalk to send a transaction
// transaction // failed signal which will make the next queue entry be processed
// failed signal which will make the next queue entry be
// processed
// Note this is UAVTalk listener TransactionFailed function // Note this is UAVTalk listener TransactionFailed function
// and not the
// object specific transaction failed. // object specific transaction failed.
utalk.cancelPendingTransaction(transInfo.obj); utalk.cancelPendingTransaction(transInfo.obj);
++txErrors; ++txErrors;
@ -460,9 +451,8 @@ public class Telemetry {
private void processObjectTransaction() throws IOException { private void processObjectTransaction() throws IOException {
if (transPending) { if (transPending) {
if (DEBUG) if (DEBUG)
Log.d(TAG, Log.d(TAG, "Process Object transaction for " + transInfo.obj.getName());
"Process Object transaction for "
+ transInfo.obj.getName());
// Initiate transaction // Initiate transaction
if (transInfo.objRequest) { if (transInfo.objRequest) {
utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances); utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances);
@ -470,6 +460,7 @@ public class Telemetry {
utalk.sendObject(transInfo.obj, transInfo.acked, utalk.sendObject(transInfo.obj, transInfo.acked,
transInfo.allInstances); transInfo.allInstances);
} }
// Start timer if a response is expected // Start timer if a response is expected
if (transInfo.objRequest || transInfo.acked) { if (transInfo.objRequest || transInfo.acked) {
transTimerSetPeriod(REQ_TIMEOUT_MS); transTimerSetPeriod(REQ_TIMEOUT_MS);
@ -481,99 +472,10 @@ public class Telemetry {
} }
} else { } else {
if (ERROR) if (ERROR)
Log.e(TAG, Log.e(TAG, "Error: inside of processObjectTransaction with no transPending");
"Error: inside of processObjectTransaction with no transPending");
} }
} }
/**
* Process events from the object queue
*
* @throws IOException
*/
private void processObjectQueue() throws IOException {
if (DEBUG)
Log.d(TAG, "Process object queue - Depth " + objQueue.size()
+ " priority " + objPriorityQueue.size());
// Don nothing if a transaction is already in progress (should not
// happen)
if (transPending) {
if (WARN)
Log.e(TAG, "Dequeue while a transaction pending");
return;
}
// Get object information from queue (first the priority and then the
// regular queue)
ObjectQueueInfo objInfo;
synchronized (objPriorityQueue) {
if (!objPriorityQueue.isEmpty()) {
objInfo = objPriorityQueue.remove();
} else {
synchronized (objQueue) {
if (!objQueue.isEmpty()) {
objInfo = objQueue.remove();
} else {
return;
}
}
}
}
// Check if a connection has been established, only process
// GCSTelemetryStats updates
// (used to establish the connection)
gcsStatsObj = objMngr.getObject("GCSTelemetryStats");
if (((String) gcsStatsObj.getField("Status").getValue())
.compareTo("Connected") != 0) {
objQueue.clear();
if (objInfo.obj.getObjID() != objMngr
.getObject("GCSTelemetryStats").getObjID()) {
if (DEBUG)
Log.d(TAG,
"transactionCompleted(false) due to receiving object not GCSTelemetryStats while not connected.");
objInfo.obj.transactionCompleted(false);
return;
}
}
// Setup transaction (skip if unpack event)
if (objInfo.event != EV_UNPACKED) {
UAVObject.Metadata metadata = objInfo.obj.getMetadata();
transInfo.obj = objInfo.obj;
transInfo.allInstances = objInfo.allInstances;
transInfo.retriesRemaining = MAX_RETRIES;
transInfo.acked = metadata.GetGcsTelemetryAcked();
if (objInfo.event == EV_UPDATED
|| objInfo.event == EV_UPDATED_MANUAL) {
transInfo.objRequest = false;
} else if (objInfo.event == EV_UPDATE_REQ) {
transInfo.objRequest = true;
}
// Start transaction
transPending = true;
processObjectTransaction();
} else {
// qDebug() <<
// QString("Process object queue: this is an unpack event for %1").arg(objInfo.obj->getName());
}
// If this is a metaobject then make necessary telemetry updates
if (objInfo.obj.isMetadata()) {
UAVMetaObject metaobj = (UAVMetaObject) objInfo.obj;
updateObject(metaobj.getParentObject());
}
// The fact we received an unpacked event does not mean that
// we do not have additional objects still in the queue,
// so we have to reschedule queue processing to make sure they are not
// stuck:
if (objInfo.event == EV_UNPACKED && !transPending)
processObjectQueue();
}
/** /**
* Check is any objects are pending for periodic updates TODO: Clean-up * Check is any objects are pending for periodic updates TODO: Clean-up
* *
@ -697,9 +599,7 @@ public class Telemetry {
private final UAVTalk utalk; private final UAVTalk utalk;
private UAVObject gcsStatsObj; private UAVObject gcsStatsObj;
private final List<ObjectTimeInfo> objList = new ArrayList<ObjectTimeInfo>(); private final List<ObjectTimeInfo> objList = new ArrayList<ObjectTimeInfo>();
private final Queue<ObjectQueueInfo> objQueue = new ConcurrentLinkedQueue<ObjectQueueInfo>(); private ObjectTransactionInfo transInfo = new ObjectTransactionInfo();
private final Queue<ObjectQueueInfo> objPriorityQueue = new ConcurrentLinkedQueue<ObjectQueueInfo>();
private final ObjectTransactionInfo transInfo = new ObjectTransactionInfo();
private boolean transPending; private boolean transPending;
private Timer updateTimer; private Timer updateTimer;
@ -718,7 +618,6 @@ public class Telemetry {
private static final int MAX_RETRIES = 2; private static final int MAX_RETRIES = 2;
private static final int MAX_UPDATE_PERIOD_MS = 1000; private static final int MAX_UPDATE_PERIOD_MS = 1000;
private static final int MIN_UPDATE_PERIOD_MS = 1; private static final int MIN_UPDATE_PERIOD_MS = 1;
private static final int MAX_QUEUE_SIZE = 20;
private final ObjectUpdateHandler handler; private final ObjectUpdateHandler handler;
@ -733,9 +632,7 @@ public class Telemetry {
void enqueueObjectUpdates(UAVObject obj, int event, void enqueueObjectUpdates(UAVObject obj, int event,
boolean allInstances, boolean priority) { boolean allInstances, boolean priority) {
if (DEBUG) if (DEBUG) Log.d(TAG, "Enqueing update " + obj.getName() + " event " + event);
Log.d(TAG, "Enqueing update " + obj.getName() + " event "
+ event);
ObjectQueueInfo objInfo = new ObjectQueueInfo(); ObjectQueueInfo objInfo = new ObjectQueueInfo();
objInfo.obj = obj; objInfo.obj = obj;
@ -794,35 +691,15 @@ public class Telemetry {
// GCSTelemetryStats updates // GCSTelemetryStats updates
// (used to establish the connection) // (used to establish the connection)
gcsStatsObj = objMngr.getObject("GCSTelemetryStats"); gcsStatsObj = objMngr.getObject("GCSTelemetryStats");
if (((String) gcsStatsObj.getField("Status").getValue()) if (((String) gcsStatsObj.getField("Status").getValue()).compareTo("Connected") != 0) {
.compareTo("Connected") != 0) { if (objInfo.obj.getObjID() != objMngr.getObject("GCSTelemetryStats").getObjID()) {
if (objInfo.obj.getObjID() != objMngr.getObject(
"GCSTelemetryStats").getObjID()) {
if (DEBUG) if (DEBUG)
Log.d(TAG, Log.d(TAG, "transactionCompleted(false) due to receiving object not GCSTelemetryStats while not connected.");
"transactionCompleted(false) due to receiving object not GCSTelemetryStats while not connected.");
objInfo.obj.transactionCompleted(false); objInfo.obj.transactionCompleted(false);
return; return;
} }
} }
// 2. Setup transaction (skip if unpack event)
if (objInfo.event != EV_UNPACKED) {
UAVObject.Metadata metadata = objInfo.obj.getMetadata();
transInfo.obj = objInfo.obj;
transInfo.allInstances = objInfo.allInstances;
transInfo.retriesRemaining = MAX_RETRIES;
transInfo.acked = metadata.GetGcsTelemetryAcked();
if (objInfo.event == EV_UPDATED
|| objInfo.event == EV_UPDATED_MANUAL) {
transInfo.objRequest = false;
} else if (objInfo.event == EV_UPDATE_REQ) {
transInfo.objRequest = true;
}
// Start transaction
transPending = true;
}
// If this is a metaobject then make necessary telemetry updates // If this is a metaobject then make necessary telemetry updates
// (this is why we catch unpack) // (this is why we catch unpack)
if (objInfo.obj.isMetadata()) { if (objInfo.obj.isMetadata()) {
@ -830,35 +707,64 @@ public class Telemetry {
updateObject(metaobj.getParentObject()); updateObject(metaobj.getParentObject());
} }
// 3. Execute transaction // 2. Setup transaction (skip if unpack event)
if (transPending) { ObjectTransactionInfo newTrans = new ObjectTransactionInfo();
try { boolean newTransactionPending = false;
if (objInfo.event != EV_UNPACKED) {
UAVObject.Metadata metadata = objInfo.obj.getMetadata();
newTrans.obj = objInfo.obj;
newTrans.allInstances = objInfo.allInstances;
newTrans.retriesRemaining = MAX_RETRIES;
newTrans.acked = metadata.GetGcsTelemetryAcked();
if (objInfo.event == EV_UPDATED || objInfo.event == EV_UPDATED_MANUAL) {
newTrans.objRequest = false;
} else if (objInfo.event == EV_UPDATE_REQ) {
newTrans.objRequest = true;
}
// Determine if this will schedule a new transaction
newTransactionPending = !(newTrans.objRequest || newTrans.acked);
// If there is a transaction pending and this would set up a new one reschedule it
if (transPending && newTransactionPending) {
if (WARN) Log.w(TAG, "Postponing transaction for" + newTrans.obj.getName());
handler.postDelayed(this, 100);
return;
}
synchronized (transInfo) {
transPending = newTransactionPending;
transInfo = newTrans;
if (DEBUG) Log.d(TAG, "Process Object transaction for " + transInfo.obj.getName()); if (DEBUG) Log.d(TAG, "Process Object transaction for " + transInfo.obj.getName());
// Initiate transaction // 3. Execute transaction
if (transInfo.objRequest) { try {
if (DEBUG) Log.d(TAG, "Sending object request");
utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances); // Initiate transaction
} else { if (transInfo.objRequest) {
if (DEBUG) Log.d(TAG, "Sending object " + transInfo.obj.getName() + " " + transInfo.obj.toStringData()); if (DEBUG) Log.d(TAG, "Sending object request");
utalk.sendObject(transInfo.obj, transInfo.acked, transInfo.allInstances); utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances);
} else {
if (DEBUG) Log.d(TAG, "Sending object " + transInfo.obj.getName() + " " + transInfo.obj.toStringData());
utalk.sendObject(transInfo.obj, transInfo.acked, transInfo.allInstances);
}
// TODO: Block if request expected (??)
if (transPending) {
transTimerSetPeriod(REQ_TIMEOUT_MS);
} else if (transTimer != null) {
synchronized(transTimer) {
transTimer.cancel();
transPending = false;
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
Log.e(TAG, "E");
e.printStackTrace();
} }
// TODO: Block if request expected (??)
if (transInfo.objRequest || transInfo.acked ) {
transTimerSetPeriod(REQ_TIMEOUT_MS);
} else {
synchronized(transTimer) {
transTimer.cancel();
transPending = false;
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
Log.e(TAG, "E");
e.printStackTrace();
} }
} }
} }