mirror of
https://bitbucket.org/librepilot/librepilot.git
synced 2025-01-17 02:52:12 +01:00
AndroidGCS: Handler based telemetry. Now reschedule transactions if one is
pending.
This commit is contained in:
parent
a78fd852b1
commit
9f326f28d8
@ -30,10 +30,8 @@ import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
import java.util.Observable;
|
||||
import java.util.Observer;
|
||||
import java.util.Queue;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
import junit.framework.Assert;
|
||||
import android.os.Handler;
|
||||
@ -49,8 +47,8 @@ public class Telemetry {
|
||||
|
||||
private final String TAG = "Telemetry";
|
||||
public static int LOGLEVEL = 0;
|
||||
public static boolean WARN = LOGLEVEL > 2;
|
||||
public static boolean DEBUG = LOGLEVEL > 1;
|
||||
public static boolean DEBUG = LOGLEVEL > 2;
|
||||
public static boolean WARN = LOGLEVEL > 1;
|
||||
public static boolean ERROR = LOGLEVEL > 0;
|
||||
|
||||
public class TelemetryStats {
|
||||
@ -391,8 +389,8 @@ public class Telemetry {
|
||||
*/
|
||||
private void transactionCompleted(UAVObject obj, boolean result)
|
||||
throws IOException {
|
||||
if (DEBUG)
|
||||
Log.d(TAG, "UAVTalk transactionCompleted");
|
||||
if (DEBUG) Log.d(TAG, "UAVTalk transactionCompleted");
|
||||
|
||||
// Check if there is a pending transaction and the objects match
|
||||
if (transPending && transInfo.obj.getObjID() == obj.getObjID()) {
|
||||
if (DEBUG) Log.d(TAG, "Telemetry: transaction completed for " + obj.getName());
|
||||
@ -406,9 +404,7 @@ public class Telemetry {
|
||||
//Send signal
|
||||
obj.transactionCompleted(result);
|
||||
} else {
|
||||
if (ERROR)
|
||||
Log.e(TAG,
|
||||
"Error: received a transaction completed when did not expect it.");
|
||||
if (ERROR) Log.e(TAG, "Error: received a transaction completed when did not expect it.");
|
||||
transPending = false;
|
||||
}
|
||||
}
|
||||
@ -433,16 +429,11 @@ public class Telemetry {
|
||||
++txRetries;
|
||||
} else {
|
||||
if (ERROR)
|
||||
Log.e(TAG,
|
||||
"Transaction failed for: "
|
||||
+ transInfo.obj.getName());
|
||||
Log.e(TAG, "Transaction failed for: " + transInfo.obj.getName());
|
||||
|
||||
// Terminate transaction. This triggers UAVTalk to send a
|
||||
// transaction
|
||||
// failed signal which will make the next queue entry be
|
||||
// processed
|
||||
// Terminate transaction. This triggers UAVTalk to send a transaction
|
||||
// failed signal which will make the next queue entry be processed
|
||||
// Note this is UAVTalk listener TransactionFailed function
|
||||
// and not the
|
||||
// object specific transaction failed.
|
||||
utalk.cancelPendingTransaction(transInfo.obj);
|
||||
++txErrors;
|
||||
@ -460,9 +451,8 @@ public class Telemetry {
|
||||
private void processObjectTransaction() throws IOException {
|
||||
if (transPending) {
|
||||
if (DEBUG)
|
||||
Log.d(TAG,
|
||||
"Process Object transaction for "
|
||||
+ transInfo.obj.getName());
|
||||
Log.d(TAG, "Process Object transaction for " + transInfo.obj.getName());
|
||||
|
||||
// Initiate transaction
|
||||
if (transInfo.objRequest) {
|
||||
utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances);
|
||||
@ -470,6 +460,7 @@ public class Telemetry {
|
||||
utalk.sendObject(transInfo.obj, transInfo.acked,
|
||||
transInfo.allInstances);
|
||||
}
|
||||
|
||||
// Start timer if a response is expected
|
||||
if (transInfo.objRequest || transInfo.acked) {
|
||||
transTimerSetPeriod(REQ_TIMEOUT_MS);
|
||||
@ -481,99 +472,10 @@ public class Telemetry {
|
||||
}
|
||||
} else {
|
||||
if (ERROR)
|
||||
Log.e(TAG,
|
||||
"Error: inside of processObjectTransaction with no transPending");
|
||||
Log.e(TAG, "Error: inside of processObjectTransaction with no transPending");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process events from the object queue
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private void processObjectQueue() throws IOException {
|
||||
if (DEBUG)
|
||||
Log.d(TAG, "Process object queue - Depth " + objQueue.size()
|
||||
+ " priority " + objPriorityQueue.size());
|
||||
|
||||
// Don nothing if a transaction is already in progress (should not
|
||||
// happen)
|
||||
if (transPending) {
|
||||
if (WARN)
|
||||
Log.e(TAG, "Dequeue while a transaction pending");
|
||||
return;
|
||||
}
|
||||
|
||||
// Get object information from queue (first the priority and then the
|
||||
// regular queue)
|
||||
ObjectQueueInfo objInfo;
|
||||
synchronized (objPriorityQueue) {
|
||||
if (!objPriorityQueue.isEmpty()) {
|
||||
objInfo = objPriorityQueue.remove();
|
||||
} else {
|
||||
synchronized (objQueue) {
|
||||
if (!objQueue.isEmpty()) {
|
||||
objInfo = objQueue.remove();
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check if a connection has been established, only process
|
||||
// GCSTelemetryStats updates
|
||||
// (used to establish the connection)
|
||||
gcsStatsObj = objMngr.getObject("GCSTelemetryStats");
|
||||
if (((String) gcsStatsObj.getField("Status").getValue())
|
||||
.compareTo("Connected") != 0) {
|
||||
objQueue.clear();
|
||||
if (objInfo.obj.getObjID() != objMngr
|
||||
.getObject("GCSTelemetryStats").getObjID()) {
|
||||
if (DEBUG)
|
||||
Log.d(TAG,
|
||||
"transactionCompleted(false) due to receiving object not GCSTelemetryStats while not connected.");
|
||||
objInfo.obj.transactionCompleted(false);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Setup transaction (skip if unpack event)
|
||||
if (objInfo.event != EV_UNPACKED) {
|
||||
UAVObject.Metadata metadata = objInfo.obj.getMetadata();
|
||||
transInfo.obj = objInfo.obj;
|
||||
transInfo.allInstances = objInfo.allInstances;
|
||||
transInfo.retriesRemaining = MAX_RETRIES;
|
||||
transInfo.acked = metadata.GetGcsTelemetryAcked();
|
||||
if (objInfo.event == EV_UPDATED
|
||||
|| objInfo.event == EV_UPDATED_MANUAL) {
|
||||
transInfo.objRequest = false;
|
||||
} else if (objInfo.event == EV_UPDATE_REQ) {
|
||||
transInfo.objRequest = true;
|
||||
}
|
||||
// Start transaction
|
||||
transPending = true;
|
||||
processObjectTransaction();
|
||||
} else {
|
||||
// qDebug() <<
|
||||
// QString("Process object queue: this is an unpack event for %1").arg(objInfo.obj->getName());
|
||||
}
|
||||
|
||||
// If this is a metaobject then make necessary telemetry updates
|
||||
if (objInfo.obj.isMetadata()) {
|
||||
UAVMetaObject metaobj = (UAVMetaObject) objInfo.obj;
|
||||
updateObject(metaobj.getParentObject());
|
||||
}
|
||||
|
||||
// The fact we received an unpacked event does not mean that
|
||||
// we do not have additional objects still in the queue,
|
||||
// so we have to reschedule queue processing to make sure they are not
|
||||
// stuck:
|
||||
if (objInfo.event == EV_UNPACKED && !transPending)
|
||||
processObjectQueue();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Check is any objects are pending for periodic updates TODO: Clean-up
|
||||
*
|
||||
@ -697,9 +599,7 @@ public class Telemetry {
|
||||
private final UAVTalk utalk;
|
||||
private UAVObject gcsStatsObj;
|
||||
private final List<ObjectTimeInfo> objList = new ArrayList<ObjectTimeInfo>();
|
||||
private final Queue<ObjectQueueInfo> objQueue = new ConcurrentLinkedQueue<ObjectQueueInfo>();
|
||||
private final Queue<ObjectQueueInfo> objPriorityQueue = new ConcurrentLinkedQueue<ObjectQueueInfo>();
|
||||
private final ObjectTransactionInfo transInfo = new ObjectTransactionInfo();
|
||||
private ObjectTransactionInfo transInfo = new ObjectTransactionInfo();
|
||||
private boolean transPending;
|
||||
|
||||
private Timer updateTimer;
|
||||
@ -718,7 +618,6 @@ public class Telemetry {
|
||||
private static final int MAX_RETRIES = 2;
|
||||
private static final int MAX_UPDATE_PERIOD_MS = 1000;
|
||||
private static final int MIN_UPDATE_PERIOD_MS = 1;
|
||||
private static final int MAX_QUEUE_SIZE = 20;
|
||||
|
||||
private final ObjectUpdateHandler handler;
|
||||
|
||||
@ -733,9 +632,7 @@ public class Telemetry {
|
||||
void enqueueObjectUpdates(UAVObject obj, int event,
|
||||
boolean allInstances, boolean priority) {
|
||||
|
||||
if (DEBUG)
|
||||
Log.d(TAG, "Enqueing update " + obj.getName() + " event "
|
||||
+ event);
|
||||
if (DEBUG) Log.d(TAG, "Enqueing update " + obj.getName() + " event " + event);
|
||||
|
||||
ObjectQueueInfo objInfo = new ObjectQueueInfo();
|
||||
objInfo.obj = obj;
|
||||
@ -794,35 +691,15 @@ public class Telemetry {
|
||||
// GCSTelemetryStats updates
|
||||
// (used to establish the connection)
|
||||
gcsStatsObj = objMngr.getObject("GCSTelemetryStats");
|
||||
if (((String) gcsStatsObj.getField("Status").getValue())
|
||||
.compareTo("Connected") != 0) {
|
||||
if (objInfo.obj.getObjID() != objMngr.getObject(
|
||||
"GCSTelemetryStats").getObjID()) {
|
||||
if (((String) gcsStatsObj.getField("Status").getValue()).compareTo("Connected") != 0) {
|
||||
if (objInfo.obj.getObjID() != objMngr.getObject("GCSTelemetryStats").getObjID()) {
|
||||
if (DEBUG)
|
||||
Log.d(TAG,
|
||||
"transactionCompleted(false) due to receiving object not GCSTelemetryStats while not connected.");
|
||||
Log.d(TAG, "transactionCompleted(false) due to receiving object not GCSTelemetryStats while not connected.");
|
||||
objInfo.obj.transactionCompleted(false);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Setup transaction (skip if unpack event)
|
||||
if (objInfo.event != EV_UNPACKED) {
|
||||
UAVObject.Metadata metadata = objInfo.obj.getMetadata();
|
||||
transInfo.obj = objInfo.obj;
|
||||
transInfo.allInstances = objInfo.allInstances;
|
||||
transInfo.retriesRemaining = MAX_RETRIES;
|
||||
transInfo.acked = metadata.GetGcsTelemetryAcked();
|
||||
if (objInfo.event == EV_UPDATED
|
||||
|| objInfo.event == EV_UPDATED_MANUAL) {
|
||||
transInfo.objRequest = false;
|
||||
} else if (objInfo.event == EV_UPDATE_REQ) {
|
||||
transInfo.objRequest = true;
|
||||
}
|
||||
// Start transaction
|
||||
transPending = true;
|
||||
}
|
||||
|
||||
// If this is a metaobject then make necessary telemetry updates
|
||||
// (this is why we catch unpack)
|
||||
if (objInfo.obj.isMetadata()) {
|
||||
@ -830,35 +707,64 @@ public class Telemetry {
|
||||
updateObject(metaobj.getParentObject());
|
||||
}
|
||||
|
||||
// 3. Execute transaction
|
||||
if (transPending) {
|
||||
try {
|
||||
// 2. Setup transaction (skip if unpack event)
|
||||
ObjectTransactionInfo newTrans = new ObjectTransactionInfo();
|
||||
boolean newTransactionPending = false;
|
||||
if (objInfo.event != EV_UNPACKED) {
|
||||
UAVObject.Metadata metadata = objInfo.obj.getMetadata();
|
||||
newTrans.obj = objInfo.obj;
|
||||
newTrans.allInstances = objInfo.allInstances;
|
||||
newTrans.retriesRemaining = MAX_RETRIES;
|
||||
newTrans.acked = metadata.GetGcsTelemetryAcked();
|
||||
if (objInfo.event == EV_UPDATED || objInfo.event == EV_UPDATED_MANUAL) {
|
||||
newTrans.objRequest = false;
|
||||
} else if (objInfo.event == EV_UPDATE_REQ) {
|
||||
newTrans.objRequest = true;
|
||||
}
|
||||
|
||||
// Determine if this will schedule a new transaction
|
||||
newTransactionPending = !(newTrans.objRequest || newTrans.acked);
|
||||
|
||||
// If there is a transaction pending and this would set up a new one reschedule it
|
||||
if (transPending && newTransactionPending) {
|
||||
if (WARN) Log.w(TAG, "Postponing transaction for" + newTrans.obj.getName());
|
||||
handler.postDelayed(this, 100);
|
||||
return;
|
||||
}
|
||||
|
||||
synchronized (transInfo) {
|
||||
transPending = newTransactionPending;
|
||||
transInfo = newTrans;
|
||||
|
||||
if (DEBUG) Log.d(TAG, "Process Object transaction for " + transInfo.obj.getName());
|
||||
|
||||
// Initiate transaction
|
||||
if (transInfo.objRequest) {
|
||||
if (DEBUG) Log.d(TAG, "Sending object request");
|
||||
utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances);
|
||||
} else {
|
||||
if (DEBUG) Log.d(TAG, "Sending object " + transInfo.obj.getName() + " " + transInfo.obj.toStringData());
|
||||
utalk.sendObject(transInfo.obj, transInfo.acked, transInfo.allInstances);
|
||||
// 3. Execute transaction
|
||||
try {
|
||||
|
||||
// Initiate transaction
|
||||
if (transInfo.objRequest) {
|
||||
if (DEBUG) Log.d(TAG, "Sending object request");
|
||||
utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances);
|
||||
} else {
|
||||
if (DEBUG) Log.d(TAG, "Sending object " + transInfo.obj.getName() + " " + transInfo.obj.toStringData());
|
||||
utalk.sendObject(transInfo.obj, transInfo.acked, transInfo.allInstances);
|
||||
}
|
||||
|
||||
|
||||
// TODO: Block if request expected (??)
|
||||
if (transPending) {
|
||||
transTimerSetPeriod(REQ_TIMEOUT_MS);
|
||||
} else if (transTimer != null) {
|
||||
synchronized(transTimer) {
|
||||
transTimer.cancel();
|
||||
transPending = false;
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// TODO Auto-generated catch block
|
||||
Log.e(TAG, "E");
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
|
||||
// TODO: Block if request expected (??)
|
||||
if (transInfo.objRequest || transInfo.acked ) {
|
||||
transTimerSetPeriod(REQ_TIMEOUT_MS);
|
||||
} else {
|
||||
synchronized(transTimer) {
|
||||
transTimer.cancel();
|
||||
transPending = false;
|
||||
}
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
// TODO Auto-generated catch block
|
||||
Log.e(TAG, "E");
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user