diff --git a/androidgcs/src/org/openpilot/uavtalk/Telemetry.java b/androidgcs/src/org/openpilot/uavtalk/Telemetry.java index 3e4007777..e514e61c4 100644 --- a/androidgcs/src/org/openpilot/uavtalk/Telemetry.java +++ b/androidgcs/src/org/openpilot/uavtalk/Telemetry.java @@ -30,10 +30,8 @@ import java.util.List; import java.util.ListIterator; import java.util.Observable; import java.util.Observer; -import java.util.Queue; import java.util.Timer; import java.util.TimerTask; -import java.util.concurrent.ConcurrentLinkedQueue; import junit.framework.Assert; import android.os.Handler; @@ -49,8 +47,8 @@ public class Telemetry { private final String TAG = "Telemetry"; public static int LOGLEVEL = 0; - public static boolean WARN = LOGLEVEL > 2; - public static boolean DEBUG = LOGLEVEL > 1; + public static boolean DEBUG = LOGLEVEL > 2; + public static boolean WARN = LOGLEVEL > 1; public static boolean ERROR = LOGLEVEL > 0; public class TelemetryStats { @@ -391,8 +389,8 @@ public class Telemetry { */ private void transactionCompleted(UAVObject obj, boolean result) throws IOException { - if (DEBUG) - Log.d(TAG, "UAVTalk transactionCompleted"); + if (DEBUG) Log.d(TAG, "UAVTalk transactionCompleted"); + // Check if there is a pending transaction and the objects match if (transPending && transInfo.obj.getObjID() == obj.getObjID()) { if (DEBUG) Log.d(TAG, "Telemetry: transaction completed for " + obj.getName()); @@ -406,9 +404,7 @@ public class Telemetry { //Send signal obj.transactionCompleted(result); } else { - if (ERROR) - Log.e(TAG, - "Error: received a transaction completed when did not expect it."); + if (ERROR) Log.e(TAG, "Error: received a transaction completed when did not expect it."); transPending = false; } } @@ -433,16 +429,11 @@ public class Telemetry { ++txRetries; } else { if (ERROR) - Log.e(TAG, - "Transaction failed for: " - + transInfo.obj.getName()); + Log.e(TAG, "Transaction failed for: " + transInfo.obj.getName()); - // Terminate transaction. This triggers UAVTalk to send a - // transaction - // failed signal which will make the next queue entry be - // processed + // Terminate transaction. This triggers UAVTalk to send a transaction + // failed signal which will make the next queue entry be processed // Note this is UAVTalk listener TransactionFailed function - // and not the // object specific transaction failed. utalk.cancelPendingTransaction(transInfo.obj); ++txErrors; @@ -460,9 +451,8 @@ public class Telemetry { private void processObjectTransaction() throws IOException { if (transPending) { if (DEBUG) - Log.d(TAG, - "Process Object transaction for " - + transInfo.obj.getName()); + Log.d(TAG, "Process Object transaction for " + transInfo.obj.getName()); + // Initiate transaction if (transInfo.objRequest) { utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances); @@ -470,6 +460,7 @@ public class Telemetry { utalk.sendObject(transInfo.obj, transInfo.acked, transInfo.allInstances); } + // Start timer if a response is expected if (transInfo.objRequest || transInfo.acked) { transTimerSetPeriod(REQ_TIMEOUT_MS); @@ -481,99 +472,10 @@ public class Telemetry { } } else { if (ERROR) - Log.e(TAG, - "Error: inside of processObjectTransaction with no transPending"); + Log.e(TAG, "Error: inside of processObjectTransaction with no transPending"); } } - /** - * Process events from the object queue - * - * @throws IOException - */ - private void processObjectQueue() throws IOException { - if (DEBUG) - Log.d(TAG, "Process object queue - Depth " + objQueue.size() - + " priority " + objPriorityQueue.size()); - - // Don nothing if a transaction is already in progress (should not - // happen) - if (transPending) { - if (WARN) - Log.e(TAG, "Dequeue while a transaction pending"); - return; - } - - // Get object information from queue (first the priority and then the - // regular queue) - ObjectQueueInfo objInfo; - synchronized (objPriorityQueue) { - if (!objPriorityQueue.isEmpty()) { - objInfo = objPriorityQueue.remove(); - } else { - synchronized (objQueue) { - if (!objQueue.isEmpty()) { - objInfo = objQueue.remove(); - } else { - return; - } - } - } - } - - // Check if a connection has been established, only process - // GCSTelemetryStats updates - // (used to establish the connection) - gcsStatsObj = objMngr.getObject("GCSTelemetryStats"); - if (((String) gcsStatsObj.getField("Status").getValue()) - .compareTo("Connected") != 0) { - objQueue.clear(); - if (objInfo.obj.getObjID() != objMngr - .getObject("GCSTelemetryStats").getObjID()) { - if (DEBUG) - Log.d(TAG, - "transactionCompleted(false) due to receiving object not GCSTelemetryStats while not connected."); - objInfo.obj.transactionCompleted(false); - return; - } - } - - // Setup transaction (skip if unpack event) - if (objInfo.event != EV_UNPACKED) { - UAVObject.Metadata metadata = objInfo.obj.getMetadata(); - transInfo.obj = objInfo.obj; - transInfo.allInstances = objInfo.allInstances; - transInfo.retriesRemaining = MAX_RETRIES; - transInfo.acked = metadata.GetGcsTelemetryAcked(); - if (objInfo.event == EV_UPDATED - || objInfo.event == EV_UPDATED_MANUAL) { - transInfo.objRequest = false; - } else if (objInfo.event == EV_UPDATE_REQ) { - transInfo.objRequest = true; - } - // Start transaction - transPending = true; - processObjectTransaction(); - } else { - // qDebug() << - // QString("Process object queue: this is an unpack event for %1").arg(objInfo.obj->getName()); - } - - // If this is a metaobject then make necessary telemetry updates - if (objInfo.obj.isMetadata()) { - UAVMetaObject metaobj = (UAVMetaObject) objInfo.obj; - updateObject(metaobj.getParentObject()); - } - - // The fact we received an unpacked event does not mean that - // we do not have additional objects still in the queue, - // so we have to reschedule queue processing to make sure they are not - // stuck: - if (objInfo.event == EV_UNPACKED && !transPending) - processObjectQueue(); - - } - /** * Check is any objects are pending for periodic updates TODO: Clean-up * @@ -697,9 +599,7 @@ public class Telemetry { private final UAVTalk utalk; private UAVObject gcsStatsObj; private final List objList = new ArrayList(); - private final Queue objQueue = new ConcurrentLinkedQueue(); - private final Queue objPriorityQueue = new ConcurrentLinkedQueue(); - private final ObjectTransactionInfo transInfo = new ObjectTransactionInfo(); + private ObjectTransactionInfo transInfo = new ObjectTransactionInfo(); private boolean transPending; private Timer updateTimer; @@ -718,7 +618,6 @@ public class Telemetry { private static final int MAX_RETRIES = 2; private static final int MAX_UPDATE_PERIOD_MS = 1000; private static final int MIN_UPDATE_PERIOD_MS = 1; - private static final int MAX_QUEUE_SIZE = 20; private final ObjectUpdateHandler handler; @@ -733,9 +632,7 @@ public class Telemetry { void enqueueObjectUpdates(UAVObject obj, int event, boolean allInstances, boolean priority) { - if (DEBUG) - Log.d(TAG, "Enqueing update " + obj.getName() + " event " - + event); + if (DEBUG) Log.d(TAG, "Enqueing update " + obj.getName() + " event " + event); ObjectQueueInfo objInfo = new ObjectQueueInfo(); objInfo.obj = obj; @@ -794,35 +691,15 @@ public class Telemetry { // GCSTelemetryStats updates // (used to establish the connection) gcsStatsObj = objMngr.getObject("GCSTelemetryStats"); - if (((String) gcsStatsObj.getField("Status").getValue()) - .compareTo("Connected") != 0) { - if (objInfo.obj.getObjID() != objMngr.getObject( - "GCSTelemetryStats").getObjID()) { + if (((String) gcsStatsObj.getField("Status").getValue()).compareTo("Connected") != 0) { + if (objInfo.obj.getObjID() != objMngr.getObject("GCSTelemetryStats").getObjID()) { if (DEBUG) - Log.d(TAG, - "transactionCompleted(false) due to receiving object not GCSTelemetryStats while not connected."); + Log.d(TAG, "transactionCompleted(false) due to receiving object not GCSTelemetryStats while not connected."); objInfo.obj.transactionCompleted(false); return; } } - // 2. Setup transaction (skip if unpack event) - if (objInfo.event != EV_UNPACKED) { - UAVObject.Metadata metadata = objInfo.obj.getMetadata(); - transInfo.obj = objInfo.obj; - transInfo.allInstances = objInfo.allInstances; - transInfo.retriesRemaining = MAX_RETRIES; - transInfo.acked = metadata.GetGcsTelemetryAcked(); - if (objInfo.event == EV_UPDATED - || objInfo.event == EV_UPDATED_MANUAL) { - transInfo.objRequest = false; - } else if (objInfo.event == EV_UPDATE_REQ) { - transInfo.objRequest = true; - } - // Start transaction - transPending = true; - } - // If this is a metaobject then make necessary telemetry updates // (this is why we catch unpack) if (objInfo.obj.isMetadata()) { @@ -830,35 +707,64 @@ public class Telemetry { updateObject(metaobj.getParentObject()); } - // 3. Execute transaction - if (transPending) { - try { + // 2. Setup transaction (skip if unpack event) + ObjectTransactionInfo newTrans = new ObjectTransactionInfo(); + boolean newTransactionPending = false; + if (objInfo.event != EV_UNPACKED) { + UAVObject.Metadata metadata = objInfo.obj.getMetadata(); + newTrans.obj = objInfo.obj; + newTrans.allInstances = objInfo.allInstances; + newTrans.retriesRemaining = MAX_RETRIES; + newTrans.acked = metadata.GetGcsTelemetryAcked(); + if (objInfo.event == EV_UPDATED || objInfo.event == EV_UPDATED_MANUAL) { + newTrans.objRequest = false; + } else if (objInfo.event == EV_UPDATE_REQ) { + newTrans.objRequest = true; + } + + // Determine if this will schedule a new transaction + newTransactionPending = !(newTrans.objRequest || newTrans.acked); + + // If there is a transaction pending and this would set up a new one reschedule it + if (transPending && newTransactionPending) { + if (WARN) Log.w(TAG, "Postponing transaction for" + newTrans.obj.getName()); + handler.postDelayed(this, 100); + return; + } + + synchronized (transInfo) { + transPending = newTransactionPending; + transInfo = newTrans; + if (DEBUG) Log.d(TAG, "Process Object transaction for " + transInfo.obj.getName()); - // Initiate transaction - if (transInfo.objRequest) { - if (DEBUG) Log.d(TAG, "Sending object request"); - utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances); - } else { - if (DEBUG) Log.d(TAG, "Sending object " + transInfo.obj.getName() + " " + transInfo.obj.toStringData()); - utalk.sendObject(transInfo.obj, transInfo.acked, transInfo.allInstances); + // 3. Execute transaction + try { + + // Initiate transaction + if (transInfo.objRequest) { + if (DEBUG) Log.d(TAG, "Sending object request"); + utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances); + } else { + if (DEBUG) Log.d(TAG, "Sending object " + transInfo.obj.getName() + " " + transInfo.obj.toStringData()); + utalk.sendObject(transInfo.obj, transInfo.acked, transInfo.allInstances); + } + + + // TODO: Block if request expected (??) + if (transPending) { + transTimerSetPeriod(REQ_TIMEOUT_MS); + } else if (transTimer != null) { + synchronized(transTimer) { + transTimer.cancel(); + transPending = false; + } + } + } catch (IOException e) { + // TODO Auto-generated catch block + Log.e(TAG, "E"); + e.printStackTrace(); } - - - // TODO: Block if request expected (??) - if (transInfo.objRequest || transInfo.acked ) { - transTimerSetPeriod(REQ_TIMEOUT_MS); - } else { - synchronized(transTimer) { - transTimer.cancel(); - transPending = false; - } - } - - } catch (IOException e) { - // TODO Auto-generated catch block - Log.e(TAG, "E"); - e.printStackTrace(); } } }