From 2d7bb4d3bbbdcb1b8b0ee2aa5dc9aef6d2ba0e73 Mon Sep 17 00:00:00 2001 From: James Cotton Date: Mon, 13 Aug 2012 15:02:15 -0500 Subject: [PATCH] AndroidGCS: Start moving the telemetry object queue to a handler and a looper --- .../telemetry/OPTelemetryService.java | 15 +- .../androidgcs/telemetry/TelemetryTask.java | 2 +- .../src/org/openpilot/uavtalk/Telemetry.java | 238 ++++++++++++------ .../uavtalk/TelemetryMonitorTest.java | 20 +- 4 files changed, 176 insertions(+), 99 deletions(-) diff --git a/androidgcs/src/org/openpilot/androidgcs/telemetry/OPTelemetryService.java b/androidgcs/src/org/openpilot/androidgcs/telemetry/OPTelemetryService.java index 4eeb57abe..4698900ef 100644 --- a/androidgcs/src/org/openpilot/androidgcs/telemetry/OPTelemetryService.java +++ b/androidgcs/src/org/openpilot/androidgcs/telemetry/OPTelemetryService.java @@ -26,16 +26,10 @@ */ package org.openpilot.androidgcs.telemetry; -import java.io.IOException; import java.lang.ref.WeakReference; -import java.util.Observable; -import java.util.Observer; -import org.openpilot.uavtalk.Telemetry; -import org.openpilot.uavtalk.TelemetryMonitor; import org.openpilot.uavtalk.UAVDataObject; import org.openpilot.uavtalk.UAVObjectManager; -import org.openpilot.uavtalk.UAVTalk; import org.openpilot.uavtalk.uavobjects.UAVObjectsInitialize; import android.app.Service; @@ -127,7 +121,7 @@ public class OPTelemetryService extends Service { break; case 2: Toast.makeText(getApplicationContext(), "Attempting BT connection", Toast.LENGTH_SHORT).show(); - activeTelem = new BTTelemetryThread(); + //activeTelem = new BTTelemetryThread(); break; case 3: Toast.makeText(getApplicationContext(), "Attempting TCP connection", Toast.LENGTH_SHORT).show(); @@ -347,6 +341,8 @@ public class OPTelemetryService extends Service { } } } + + /* private class BTTelemetryThread extends Thread implements TelemTask { private final UAVObjectManager objMngr; @@ -401,7 +397,7 @@ public class OPTelemetryService extends Service { @Override public void update(Observable arg0, Object arg1) { if (DEBUG) Log.d(TAG, "Mon updated. Connected: " + mon.getConnected() + " objects updated: " + mon.getObjectsUpdated()); - if(mon.getConnected() /*&& mon.getObjectsUpdated()*/) { + if(mon.getConnected() ) { Intent intent = new Intent(); intent.setAction(INTENT_ACTION_CONNECTED); sendBroadcast(intent,null); @@ -423,6 +419,5 @@ public class OPTelemetryService extends Service { } if (DEBUG) Log.d(TAG, "UAVTalk stream disconnected"); } - - }; + };*/ } diff --git a/androidgcs/src/org/openpilot/androidgcs/telemetry/TelemetryTask.java b/androidgcs/src/org/openpilot/androidgcs/telemetry/TelemetryTask.java index f09cc7236..3d16d94f9 100644 --- a/androidgcs/src/org/openpilot/androidgcs/telemetry/TelemetryTask.java +++ b/androidgcs/src/org/openpilot/androidgcs/telemetry/TelemetryTask.java @@ -105,7 +105,7 @@ public abstract class TelemetryTask implements Runnable { // Create the required telemetry objects attached to this // data stream uavTalk = new UAVTalk(inStream, outStream, objMngr); - tel = new Telemetry(uavTalk, objMngr); + tel = new Telemetry(uavTalk, objMngr, Looper.myLooper()); mon = new TelemetryMonitor(objMngr,tel, telemService); // Create an observer to notify system of connection diff --git a/androidgcs/src/org/openpilot/uavtalk/Telemetry.java b/androidgcs/src/org/openpilot/uavtalk/Telemetry.java index 8e6161074..ac814e76b 100644 --- a/androidgcs/src/org/openpilot/uavtalk/Telemetry.java +++ b/androidgcs/src/org/openpilot/uavtalk/Telemetry.java @@ -35,9 +35,17 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentLinkedQueue; +import junit.framework.Assert; +import android.os.Handler; +import android.os.Looper; import android.util.Log; public class Telemetry { + /** + * Telemetry provides a messaging handler to handle all the object updates and transfer + * requests. This handler can either be attached to a new loop attached to the thread + * started by the telemetry service. + */ private final String TAG = "Telemetry"; public static int LOGLEVEL = 1; @@ -98,11 +106,14 @@ public class Telemetry { /** * Constructor */ - public Telemetry(UAVTalk utalkIn, UAVObjectManager objMngr) + public Telemetry(UAVTalk utalkIn, UAVObjectManager objMngr, Looper l) { this.utalk = utalkIn; this.objMngr = objMngr; + // Create a handler for object messages + handler = new ObjectUpdateHandler(l); + // Process all objects in the list List< List > objs = objMngr.getObjects(); ListIterator> li = objs.listIterator(); @@ -265,48 +276,28 @@ public class Telemetry { final Observer unpackedObserver = new Observer() { @Override public void update(Observable observable, Object data) { - try { - enqueueObjectUpdates((UAVObject) data, EV_UNPACKED, false, true); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } + handler.unpacked((UAVObject) data); } }; final Observer updatedAutoObserver = new Observer() { @Override public void update(Observable observable, Object data) { - try { - enqueueObjectUpdates((UAVObject) data, EV_UPDATED, false, true); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } + handler.updatedAuto((UAVObject) data); } }; final Observer updatedManualObserver = new Observer() { @Override public void update(Observable observable, Object data) { - try { - enqueueObjectUpdates((UAVObject) data, EV_UPDATED_MANUAL, false, true); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } + handler.updatedManual((UAVObject) data); } }; final Observer updatedRequestedObserver = new Observer() { @Override public void update(Observable observable, Object data) { - try { - enqueueObjectUpdates((UAVObject) data, EV_UPDATE_REQ, false, true); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } + handler.updateRequested((UAVObject) data); } }; @@ -488,59 +479,6 @@ public class Telemetry { } } - /** - * Enqueue the event received from an object. This is the main method that handles all the callbacks - * from UAVObjects (due to updates, or update requests) - */ - private void enqueueObjectUpdates(UAVObject obj, int event, boolean allInstances, boolean priority) throws IOException - { - // Push event into queue - if (DEBUG) Log.d(TAG, "Push event into queue for obj " + obj.getName() + " event " + event); - if(event == 8 && obj.getName().compareTo("GCSTelemetryStats") == 0) - Thread.dumpStack(); - ObjectQueueInfo objInfo = new ObjectQueueInfo(); - objInfo.obj = obj; - objInfo.event = event; - objInfo.allInstances = allInstances; - if (priority) - { - // Only enqueue if an identical transaction does not already exist - if(!objPriorityQueue.contains(objInfo)) { - if ( objPriorityQueue.size() < MAX_QUEUE_SIZE ) - { - objPriorityQueue.add(objInfo); - } - else - { - ++txErrors; - obj.transactionCompleted(false); - Log.w(TAG,"Telemetry: priority event queue is full, event lost " + obj.getName()); - } - } - } - else - { - // Only enqueue if an identical transaction does not already exist - if(!objQueue.contains(objInfo)) { - if ( objQueue.size() < MAX_QUEUE_SIZE ) - { - objQueue.add(objInfo); - } - else - { - ++txErrors; - obj.transactionCompleted(false); - } - } - } - - // If there is no transaction in progress then process event - if (!transPending) - { - processObjectQueue(); - } - } - /** * Process events from the object queue * @throws IOException @@ -666,7 +604,8 @@ public class Telemetry { objinfo.timeToNextUpdateMs = objinfo.updatePeriodMs - offset; // Send object startTime = System.currentTimeMillis(); - enqueueObjectUpdates(objinfo.obj, EV_UPDATED_MANUAL, true, false); + handler.updatedManual(objinfo.obj); + //enqueueObjectUpdates(objinfo.obj, EV_UPDATED_MANUAL, true, false); elapsedMs = (int) (System.currentTimeMillis() - startTime); // Update timeToNextUpdateMs with the elapsed delay of sending the object; timeToNextUpdateMs += elapsedMs; @@ -780,6 +719,147 @@ public class Telemetry { private static final int MIN_UPDATE_PERIOD_MS = 1; private static final int MAX_QUEUE_SIZE = 20; + private final ObjectUpdateHandler handler; + public class ObjectUpdateHandler extends Handler { + //! This can only be created while attaching to a particular looper + ObjectUpdateHandler(Looper l) { + super(l); + } + + //! Generic enqueue + void enqueueObjectUpdates(UAVObject obj, int event, boolean allInstances, boolean priority) { + + if (DEBUG) Log.d(TAG, "Enqueing update " + obj.getName() + " event " + event); + + ObjectQueueInfo objInfo = new ObjectQueueInfo(); + objInfo.obj = obj; + objInfo.event = event; + objInfo.allInstances = allInstances; + + post(new ObjectRunnable(objInfo)); + } + + //! Enqueue an unpacked event + void unpacked(UAVObject obj) { + enqueueObjectUpdates(obj, EV_UNPACKED, false, true); + } + + //! Enqueue an updated auto event + void updatedAuto(UAVObject obj) { + enqueueObjectUpdates(obj,EV_UPDATED, false, true); + } + + //! Enqueue an updated manual event + void updatedManual(UAVObject obj) { + enqueueObjectUpdates(obj, EV_UPDATE_REQ, false, true); + } + + //! Enqueue an update requested event + void updateRequested(UAVObject obj) { + enqueueObjectUpdates(obj, EV_UPDATE_REQ, false, true); + } + + } + + class ObjectRunnable implements Runnable { + + //! Transaction information to perform + private final ObjectQueueInfo objInfo; +// private final ObjectTransactionInfo transInfo = new ObjectTransactionInfo(); + + ObjectRunnable(ObjectQueueInfo info) { + Assert.assertNotNull(info); + objInfo = info; + } + + //! Perform the transaction on the looper thread + @Override + public void run () { + Log.d(TAG,"object transaction running"); + // 1. Check GCS is connected, throw this out if not + // 2. Set up a transaction which includes multiple retries, whether to wait for ack etc + // 3. Send UAVTalk message + // 4. Based on transaction type either wait for update or end + + // 1. Check if a connection has been established, only process GCSTelemetryStats updates + // (used to establish the connection) + gcsStatsObj = objMngr.getObject("GCSTelemetryStats"); + if ( ((String) gcsStatsObj.getField("Status").getValue()).compareTo("Connected") != 0 ) + { + if ( objInfo.obj.getObjID() != objMngr.getObject("GCSTelemetryStats").getObjID() ) + { + if (DEBUG) Log.d(TAG,"transactionCompleted(false) due to receiving object not GCSTelemetryStats while not connected."); + objInfo.obj.transactionCompleted(false); + return; + } + } + + Log.e(TAG, "A"); + // 2. Setup transaction (skip if unpack event) + if ( objInfo.event != EV_UNPACKED ) + { + Log.e(TAG, "A1"); + UAVObject.Metadata metadata = objInfo.obj.getMetadata(); + transInfo.obj = objInfo.obj; + transInfo.allInstances = objInfo.allInstances; + transInfo.retriesRemaining = MAX_RETRIES; + transInfo.acked = metadata.GetGcsTelemetryAcked(); + if ( objInfo.event == EV_UPDATED || objInfo.event == EV_UPDATED_MANUAL ) + { + transInfo.objRequest = false; + } + else if ( objInfo.event == EV_UPDATE_REQ ) + { + transInfo.objRequest = true; + } + // Start transaction + transPending = true; + } + Log.e(TAG, "B"); + // If this is a metaobject then make necessary telemetry updates (this is why we catch unpack) + if (objInfo.obj.isMetadata()) + { + UAVMetaObject metaobj = (UAVMetaObject) objInfo.obj; + updateObject( metaobj.getParentObject() ); + } + Log.e(TAG, "C"); + // 3. Execute transaction + if (transPending) + { + Log.e(TAG, "D"); + try { + if (DEBUG || true) Log.d(TAG, "Process Object transaction for " + transInfo.obj.getName()); + // Initiate transaction + if (transInfo.objRequest) + { + utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances); + } + else + { + Log.d(TAG, "Sending object"); + utalk.sendObject(transInfo.obj, transInfo.acked, transInfo.allInstances); + } + + // TODO: Block if request expected (??) + if ( transInfo.objRequest || transInfo.acked ) + { + transTimerSetPeriod(REQ_TIMEOUT_MS); + } + else + { + synchronized(transTimer) { + transTimer.cancel(); + transPending = false; + } + } + } catch (IOException e) { + // TODO Auto-generated catch block + Log.e(TAG, "E"); + e.printStackTrace(); + } + } + } + } } diff --git a/androidgcs/tests/org/openpilot/uavtalk/TelemetryMonitorTest.java b/androidgcs/tests/org/openpilot/uavtalk/TelemetryMonitorTest.java index 0b0cc1d73..6ddcfe03e 100644 --- a/androidgcs/tests/org/openpilot/uavtalk/TelemetryMonitorTest.java +++ b/androidgcs/tests/org/openpilot/uavtalk/TelemetryMonitorTest.java @@ -1,6 +1,6 @@ package org.openpilot.uavtalk; -import static org.junit.Assert.*; +import static org.junit.Assert.fail; import java.io.IOException; import java.net.InetAddress; @@ -8,11 +8,12 @@ import java.net.Socket; import org.junit.Test; import org.openpilot.uavtalk.uavobjects.UAVObjectsInitialize; -import org.openpilot.uavtalk.UAVTalk; + +import android.os.Looper; public class TelemetryMonitorTest { - + static UAVObjectManager objMngr; static UAVTalk talk; static final String IP_ADDRDESS = new String("127.0.0.1"); @@ -32,7 +33,7 @@ public class TelemetryMonitorTest { e.printStackTrace(); fail("Couldn't connect to test platform"); } - + try { talk = new UAVTalk(connection.getInputStream(), connection.getOutputStream(), objMngr); } catch (IOException e) { @@ -40,16 +41,17 @@ public class TelemetryMonitorTest { e.printStackTrace(); fail("Couldn't construct UAVTalk object"); } - + Thread inputStream = talk.getInputProcessThread(); inputStream.start(); - - Telemetry tel = new Telemetry(talk, objMngr); + + Looper.prepare(); + Telemetry tel = new Telemetry(talk, objMngr, Looper.myLooper()); @SuppressWarnings("unused") TelemetryMonitor mon = new TelemetryMonitor(objMngr,tel); - + Thread.sleep(10000); - + System.out.println("Done"); }