From b614368359ee2a8583463f4c5e7e5c173e74e65c Mon Sep 17 00:00:00 2001 From: James Cotton Date: Wed, 15 Aug 2012 01:14:57 -0500 Subject: [PATCH] AndroidGCS Telemetry: Use a runnable for the transaction timeout. Now all of telemetry is using handlers nicely, but we still can have multiple transactions queued for the same object. --- .../src/org/openpilot/uavtalk/Telemetry.java | 286 ++++++++---------- 1 file changed, 118 insertions(+), 168 deletions(-) diff --git a/androidgcs/src/org/openpilot/uavtalk/Telemetry.java b/androidgcs/src/org/openpilot/uavtalk/Telemetry.java index e514e61c4..10885f746 100644 --- a/androidgcs/src/org/openpilot/uavtalk/Telemetry.java +++ b/androidgcs/src/org/openpilot/uavtalk/Telemetry.java @@ -126,7 +126,7 @@ public class Telemetry { ListIterator> li = objs.listIterator(); while (li.hasNext()) registerObject(li.next().get(0)); // we only need to register one - // instance per object type + // instance per object type // Listen to new object creations objMngr.addNewInstanceObserver(new Observer() { @@ -146,25 +146,15 @@ public class Telemetry { utalk.setOnTransactionCompletedListener(utalk.new OnTransactionCompletedListener() { @Override void TransactionSucceeded(UAVObject data) { - try { - transactionCompleted(data, true); - } catch (IOException e) { - // Disconnect when stream fails - utalk.setOnTransactionCompletedListener(null); - } + transactionCompleted(data, true); } @Override void TransactionFailed(UAVObject data) { - try { - if (DEBUG) - Log.d(TAG, "TransactionFailed(" + data.getName() + ")"); + if (DEBUG) + Log.d(TAG, "TransactionFailed(" + data.getName() + ")"); - transactionCompleted(data, false); - } catch (IOException e) { - // Disconnect when stream fails - utalk.setOnTransactionCompletedListener(null); - } + transactionCompleted(data, false); } }); @@ -182,28 +172,6 @@ public class Telemetry { txRetries = 0; } - synchronized void transTimerSetPeriod(int periodMs) { - if (transTimerTask != null) - transTimerTask.cancel(); - - if (transTimer != null) - transTimer.purge(); - - transTimer = new Timer(); - - transTimerTask = new TimerTask() { - @Override - public void run() { - try { - transactionTimeout(); - } catch (IOException e) { - cancel(); - } - } - }; - transTimer.schedule(transTimerTask, periodMs, periodMs); - } - synchronized void updateTimerSetPeriod(int periodMs) { if (updateTimer != null) { updateTimer.cancel(); @@ -354,7 +322,7 @@ public class Telemetry { eventMask = EV_UPDATED_MANUAL | EV_UPDATE_REQ; if (obj.isMetadata()) eventMask |= EV_UNPACKED; // we also need to act on remote - // updates (unpack events) + // updates (unpack events) connectToObjectInstances(obj, eventMask); } else if (metadata.GetGcsTelemetryUpdateMode() == UAVObject.UpdateMode.UPDATEMODE_ONCHANGE) { @@ -364,7 +332,7 @@ public class Telemetry { eventMask = EV_UPDATED | EV_UPDATED_MANUAL | EV_UPDATE_REQ; if (obj.isMetadata()) eventMask |= EV_UNPACKED; // we also need to act on remote - // updates (unpack events) + // updates (unpack events) connectToObjectInstances(obj, eventMask); } else if (metadata.GetGcsTelemetryUpdateMode() == UAVObject.UpdateMode.UPDATEMODE_THROTTLED) { @@ -376,106 +344,12 @@ public class Telemetry { eventMask = EV_UPDATED_MANUAL | EV_UPDATE_REQ; if (obj.isMetadata()) eventMask |= EV_UNPACKED; // we also need to act on remote - // updates (unpack events) + // updates (unpack events) connectToObjectInstances(obj, eventMask); } } - /** - * Called when a transaction is successfully completed (uavtalk event) - * - * @throws IOException - */ - private void transactionCompleted(UAVObject obj, boolean result) - throws IOException { - if (DEBUG) Log.d(TAG, "UAVTalk transactionCompleted"); - - // Check if there is a pending transaction and the objects match - if (transPending && transInfo.obj.getObjID() == obj.getObjID()) { - if (DEBUG) Log.d(TAG, "Telemetry: transaction completed for " + obj.getName()); - - // Complete transaction - synchronized(transTimer) { - transTimer.cancel(); - transPending = false; - } - - //Send signal - obj.transactionCompleted(result); - } else { - if (ERROR) Log.e(TAG, "Error: received a transaction completed when did not expect it."); - transPending = false; - } - } - - /** - * Called when a transaction is not completed within the timeout period - * (timer event) - * - * @throws IOException - */ - private void transactionTimeout() throws IOException { - if (DEBUG) - Log.d(TAG, "Telemetry: transaction timeout."); - synchronized (transTimer) { - transTimer.cancel(); - // Proceed only if there is a pending transaction - if (transPending) { - // Check if more retries are pending - if (transInfo.retriesRemaining > 0) { - --transInfo.retriesRemaining; - processObjectTransaction(); - ++txRetries; - } else { - if (ERROR) - Log.e(TAG, "Transaction failed for: " + transInfo.obj.getName()); - - // Terminate transaction. This triggers UAVTalk to send a transaction - // failed signal which will make the next queue entry be processed - // Note this is UAVTalk listener TransactionFailed function - // object specific transaction failed. - utalk.cancelPendingTransaction(transInfo.obj); - ++txErrors; - } - } - } - } - - /** - * Start an object transaction with UAVTalk, all information is stored in - * transInfo - * - * @throws IOException - */ - private void processObjectTransaction() throws IOException { - if (transPending) { - if (DEBUG) - Log.d(TAG, "Process Object transaction for " + transInfo.obj.getName()); - - // Initiate transaction - if (transInfo.objRequest) { - utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances); - } else { - utalk.sendObject(transInfo.obj, transInfo.acked, - transInfo.allInstances); - } - - // Start timer if a response is expected - if (transInfo.objRequest || transInfo.acked) { - transTimerSetPeriod(REQ_TIMEOUT_MS); - } else { - synchronized (transTimer) { - transTimer.cancel(); - transPending = false; - } - } - } else { - if (ERROR) - Log.e(TAG, "Error: inside of processObjectTransaction with no transPending"); - } - } - /** * Check is any objects are pending for periodic updates TODO: Clean-up * @@ -513,6 +387,8 @@ public class Telemetry { - offset; // Send object startTime = System.currentTimeMillis(); + + if (DEBUG) Log.d(TAG, "Manual update: " + objinfo.obj.getName()); handler.updatedManual(objinfo.obj); // enqueueObjectUpdates(objinfo.obj, EV_UPDATED_MANUAL, // true, false); @@ -584,12 +460,6 @@ public class Telemetry { if (updateTimer != null) updateTimer.cancel(); updateTimer = null; - if (transTimerTask != null) - transTimerTask.cancel(); - transTimerTask = null; - if (transTimer != null) - transTimer.cancel(); - transTimer = null; } /** @@ -604,8 +474,6 @@ public class Telemetry { private Timer updateTimer; private TimerTask updateTimerTask; - private Timer transTimer; - private TimerTask transTimerTask; private int timeToNextUpdateMs; private int txErrors; @@ -664,14 +532,15 @@ public class Telemetry { } + /** + * Perform an update on an object where on an event based on the contents provided + * to the constructors. This update will also set a timeout for transaction failure. + */ class ObjectRunnable implements Runnable { // ! Transaction information to perform private final ObjectQueueInfo objInfo; - // private final ObjectTransactionInfo transInfo = new - // ObjectTransactionInfo(); - ObjectRunnable(ObjectQueueInfo info) { Assert.assertNotNull(info); objInfo = info; @@ -725,48 +594,129 @@ public class Telemetry { // Determine if this will schedule a new transaction newTransactionPending = !(newTrans.objRequest || newTrans.acked); - // If there is a transaction pending and this would set up a new one reschedule it - if (transPending && newTransactionPending) { - if (WARN) Log.w(TAG, "Postponing transaction for" + newTrans.obj.getName()); - handler.postDelayed(this, 100); - return; - } - synchronized (transInfo) { + + // If there is a transaction pending and this would set up a new one reschedule it + if (transPending && newTransactionPending) { + if (WARN) Log.w(TAG, "Postponing transaction for" + newTrans.obj.getName()); + handler.postDelayed(this, 100); + return; + } + + // Store this as the active transaction transPending = newTransactionPending; transInfo = newTrans; if (DEBUG) Log.d(TAG, "Process Object transaction for " + transInfo.obj.getName()); - // 3. Execute transaction try { - // Initiate transaction + // 3. Execute transaction by sending the appropriate UAVTalk command if (transInfo.objRequest) { - if (DEBUG) Log.d(TAG, "Sending object request"); + if (DEBUG) Log.d(TAG, "Sending object request" + transInfo.obj.getName()); utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances); } else { - if (DEBUG) Log.d(TAG, "Sending object " + transInfo.obj.getName() + " " + transInfo.obj.toStringData()); + if (DEBUG) Log.d(TAG, "Sending object " + transInfo.obj.getName()); utalk.sendObject(transInfo.obj, transInfo.acked, transInfo.allInstances); } - - // TODO: Block if request expected (??) - if (transPending) { - transTimerSetPeriod(REQ_TIMEOUT_MS); - } else if (transTimer != null) { - synchronized(transTimer) { - transTimer.cancel(); - transPending = false; - } - } } catch (IOException e) { - // TODO Auto-generated catch block - Log.e(TAG, "E"); + if (ERROR) Log.e(TAG, "Unable to send UAVTalk message"); e.printStackTrace(); } + + // Post a timeout timer if a response is epxected + if (transPending) + handler.postDelayed(transactionTimeout, REQ_TIMEOUT_MS); } } } } + + + /** + * Runnable posted to handle a timeout of a transaction. Tracks the number of retry attempts + * retries that many, and finally sends a transaction failed signal. + */ + final Runnable transactionTimeout = new Runnable() { + @Override + public void run() { + // Lock on the transaction + synchronized (transInfo) { + + // Proceed only if there is a pending transaction + if (!transPending) { + if (WARN) Log.w(TAG,"Transaction completed but timeout still called. Probable race condition"); + return; + } + + if (DEBUG) Log.d(TAG, "Telemetry: transaction timeout."); + + // Check if more retries are pending + if (transInfo.retriesRemaining > 0) { + --transInfo.retriesRemaining; + + // Repeat whatever is required for this transaction type + // (transInfo.objRequest) { + if (DEBUG) Log.d(TAG, "Sending object request"); + + try { + // Execute transaction by sending the appropriate UAVTalk command + if (transInfo.objRequest) { + if (DEBUG) Log.d(TAG, "Sending object request" + transInfo.obj.getName()); + utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances); + } else { + if (DEBUG) Log.d(TAG, "Sending object " + transInfo.obj.getName()); + utalk.sendObject(transInfo.obj, transInfo.acked, transInfo.allInstances); + } + } catch (IOException e) { + if (ERROR) Log.e(TAG, "Unable to send UAVTalk message"); + e.printStackTrace(); + } + + handler.postDelayed(transactionTimeout, REQ_TIMEOUT_MS); + + ++txRetries; + } else { + if (ERROR) Log.e(TAG, "Transaction failed for: " + transInfo.obj.getName()); + + // Terminate transaction. This triggers UAVTalk to send a transaction + // failed signal which will make the next queue entry be processed + // Note this is UAVTalk listener TransactionFailed function + // object specific transaction failed. + utalk.cancelPendingTransaction(transInfo.obj); + ++txErrors; + } + } + } + }; + + + /** + * Called when a transaction is successfully completed (UAVTalk event) and maps that to + * the appropriate object event as well as canceling the pending transaction and timeout + */ + private void transactionCompleted(UAVObject obj, boolean result) { + + if (DEBUG) Log.d(TAG, "UAVTalk transactionCompleted"); + + // Check if there is a pending transaction and the objects match + synchronized(transInfo) { + if (transPending && transInfo.obj.getObjID() == obj.getObjID()) { + if (DEBUG) Log.d(TAG, "Telemetry: transaction completed for " + obj.getName()); + + // Cancel timeout and complete transaction + handler.removeCallbacks(transactionTimeout); + transPending = false; + + //Send signal + obj.transactionCompleted(result); + } else { + if (ERROR) Log.e(TAG, "Error: received a transaction completed when did not expect it."); + transPending = false; + } + } + } + } +