diff --git a/androidgcs/src/org/openpilot/androidgcs/TcpUAVTalk.java b/androidgcs/src/org/openpilot/androidgcs/TcpUAVTalk.java index 1f647bd99..cb21abb94 100644 --- a/androidgcs/src/org/openpilot/androidgcs/TcpUAVTalk.java +++ b/androidgcs/src/org/openpilot/androidgcs/TcpUAVTalk.java @@ -24,13 +24,19 @@ public class TcpUAVTalk { private UAVTalk uavTalk; private boolean connected; + private Socket socket; + /** + * Construct a TcpUAVTalk object attached to the OPTelemetryService. Gets the + * connection settings from the preferences. + */ public TcpUAVTalk(Context caller) { SharedPreferences prefs = PreferenceManager.getDefaultSharedPreferences(caller); ip_address = prefs.getString("ip_address","127.0.0.1"); try { port = Integer.decode(prefs.getString("port", "")); } catch (NumberFormatException e) { + //TODO: Handle this exception } if (DEBUG) Log.d(TAG, "Trying to open UAVTalk with " + ip_address); @@ -38,6 +44,10 @@ public class TcpUAVTalk { connected = false; } + /** + * Connect a TCP object to an object manager. Returns true if already + * connected, otherwise returns true if managed a successful socket. + */ public boolean connect(UAVObjectManager objMngr) { if( getConnected() ) return true; @@ -45,6 +55,16 @@ public class TcpUAVTalk { return false; return true; } + + public void disconnect() { + try { + socket.close(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + socket = null; + } public boolean getConnected() { return connected; @@ -54,7 +74,10 @@ public class TcpUAVTalk { return uavTalk; } - + /** + * Opens a TCP socket to the address determined on construction. If successful + * creates a UAVTalk stream connection this socket to the passed in object manager + */ private boolean openTelemetryTcp(UAVObjectManager objMngr) { Log.d(TAG, "Opening connection to " + ip_address + " at address " + port); @@ -66,13 +89,11 @@ public class TcpUAVTalk { e1.printStackTrace(); return false; } - - Socket socket = null; + + socket = null; try { socket = new Socket(serverAddr,port); } catch (IOException e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); return false; } diff --git a/androidgcs/src/org/openpilot/uavtalk/Telemetry.java b/androidgcs/src/org/openpilot/uavtalk/Telemetry.java index 16a527088..06f820c85 100644 --- a/androidgcs/src/org/openpilot/uavtalk/Telemetry.java +++ b/androidgcs/src/org/openpilot/uavtalk/Telemetry.java @@ -1,5 +1,6 @@ package org.openpilot.uavtalk; +import java.io.IOException; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -88,7 +89,12 @@ public class Telemetry { // Listen to transaction completions utalk.addObserver(new Observer() { public void update(Observable observable, Object data) { - transactionCompleted((UAVObject) data); + try { + transactionCompleted((UAVObject) data); + } catch (IOException e) { + // Disconnect when stream fails + observable.deleteObserver(this); + } } }); @@ -117,18 +123,35 @@ public class Telemetry { transTimerTask = new TimerTask() { @Override public void run() { - transactionTimeout(); + try { + transactionTimeout(); + } catch (IOException e) { + cancel(); + } } }; transTimer.schedule(transTimerTask, periodMs, periodMs); } synchronized void updateTimerSetPeriod(int periodMs) { + if (updateTimer != null) { + updateTimer.cancel(); + updateTimer = null; + } + if (updateTimerTask != null) { + updateTimerTask.cancel(); + updateTimerTask = null; + } updateTimer = new Timer(); updateTimerTask = new TimerTask() { @Override public void run() { - processPeriodicUpdates(); + try { + processPeriodicUpdates(); + } catch (IOException e) { + updateTimerTask.cancel(); + updateTimer.cancel(); + } } }; updateTimer.schedule(updateTimerTask, periodMs, periodMs); @@ -206,7 +229,12 @@ public class Telemetry { { obj.addUnpackedObserver(new Observer() { public void update(Observable observable, Object data) { - objectUnpacked( (UAVObject) data); + try { + objectUnpacked( (UAVObject) data); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } } }); } @@ -214,7 +242,12 @@ public class Telemetry { { obj.addUpdatedAutoObserver(new Observer() { public void update(Observable observable, Object data) { - objectUpdatedAuto( (UAVObject) data); + try { + objectUpdatedAuto( (UAVObject) data); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } } }); } @@ -222,7 +255,12 @@ public class Telemetry { { obj.addUpdatedManualObserver(new Observer() { public void update(Observable observable, Object data) { - objectUpdatedManual( (UAVObject) data); + try { + objectUpdatedManual( (UAVObject) data); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } } }); } @@ -230,7 +268,12 @@ public class Telemetry { { obj.addUpdateRequestedObserver(new Observer() { public void update(Observable observable, Object data) { - updateRequested( (UAVObject) data); + try { + updateRequested( (UAVObject) data); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } } }); } @@ -288,8 +331,9 @@ public class Telemetry { /** * Called when a transaction is successfully completed (uavtalk event) + * @throws IOException */ - private synchronized void transactionCompleted(UAVObject obj) + private synchronized void transactionCompleted(UAVObject obj) throws IOException { if (DEBUG) Log.d(TAG,"UAVTalk transactionCompleted"); // Check if there is a pending transaction and the objects match @@ -311,8 +355,9 @@ public class Telemetry { /** * Called when a transaction is not completed within the timeout period (timer event) + * @throws IOException */ - private synchronized void transactionTimeout() + private synchronized void transactionTimeout() throws IOException { if (DEBUG) Log.d(TAG,"Telemetry: transaction timeout."); transTimer.cancel(); @@ -342,8 +387,9 @@ public class Telemetry { /** * Start an object transaction with UAVTalk, all information is stored in transInfo + * @throws IOException */ - private synchronized void processObjectTransaction() + private synchronized void processObjectTransaction() throws IOException { if (transPending) { @@ -375,8 +421,9 @@ public class Telemetry { /** * Process the event received from an object + * @throws IOException */ - private synchronized void processObjectUpdates(UAVObject obj, int event, boolean allInstances, boolean priority) + private synchronized void processObjectUpdates(UAVObject obj, int event, boolean allInstances, boolean priority) throws IOException { // Push event into queue if (DEBUG) Log.d(TAG, "Push event into queue for obj " + obj.getName() + " event " + event); @@ -397,6 +444,7 @@ public class Telemetry { ++txErrors; obj.transactionCompleted(false); Log.w(TAG,"Telemetry: priority event queue is full, event lost " + obj.getName()); + new Exception().printStackTrace(); } } else @@ -421,8 +469,9 @@ public class Telemetry { /** * Process events from the object queue + * @throws IOException */ - private synchronized void processObjectQueue() + private synchronized void processObjectQueue() throws IOException { if (DEBUG) Log.d(TAG, "Process object queue - Depth " + objQueue.size() + " priority " + objPriorityQueue.size()); @@ -505,8 +554,9 @@ public class Telemetry { /** * Check is any objects are pending for periodic updates * TODO: Clean-up + * @throws IOException */ - private synchronized void processPeriodicUpdates() + private synchronized void processPeriodicUpdates() throws IOException { if (DEBUG) Log.d(TAG, "processPeriodicUpdates()"); @@ -590,22 +640,22 @@ public class Telemetry { txRetries = 0; } - private synchronized void objectUpdatedAuto(UAVObject obj) + private synchronized void objectUpdatedAuto(UAVObject obj) throws IOException { processObjectUpdates(obj, EV_UPDATED, false, true); } - private synchronized void objectUpdatedManual(UAVObject obj) + private synchronized void objectUpdatedManual(UAVObject obj) throws IOException { processObjectUpdates(obj, EV_UPDATED_MANUAL, false, true); } - private synchronized void objectUnpacked(UAVObject obj) + private synchronized void objectUnpacked(UAVObject obj) throws IOException { processObjectUpdates(obj, EV_UNPACKED, false, true); } - public synchronized void updateRequested(UAVObject obj) + public synchronized void updateRequested(UAVObject obj) throws IOException { processObjectUpdates(obj, EV_UPDATE_REQ, false, true); } @@ -620,6 +670,25 @@ public class Telemetry { registerObject(obj); } + /** + * Stop all the telemetry timers + */ + public void stopTelemetry() + { + if (updateTimerTask != null) + updateTimerTask.cancel(); + updateTimerTask = null; + if (updateTimer != null) + updateTimer.cancel(); + updateTimer = null; + if (transTimerTask != null) + transTimerTask.cancel(); + transTimerTask = null; + if (transTimer != null) + transTimer.cancel(); + transTimer = null; + } + /** * Private variables */ diff --git a/androidgcs/src/org/openpilot/uavtalk/TelemetryMonitor.java b/androidgcs/src/org/openpilot/uavtalk/TelemetryMonitor.java index 5f80ff779..9990bb308 100644 --- a/androidgcs/src/org/openpilot/uavtalk/TelemetryMonitor.java +++ b/androidgcs/src/org/openpilot/uavtalk/TelemetryMonitor.java @@ -1,5 +1,6 @@ package org.openpilot.uavtalk; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.ListIterator; @@ -50,7 +51,14 @@ public class TelemetryMonitor extends Observable{ flightStatsObj.addUpdatedObserver(new Observer() { public void update(Observable observable, Object data) { - flightStatsUpdated((UAVObject) data); + try { + flightStatsUpdated((UAVObject) data); + } catch (IOException e) { + // The UAVTalk stream was broken, disconnect this signal + // TODO: Should this actually be disconnected. Do we create a new TelemetryMonitor for this + // or fix the stream? + flightStatsObj.removeUpdatedObserver(this); + } } }); @@ -60,8 +68,9 @@ public class TelemetryMonitor extends Observable{ /** * Initiate object retrieval, initialize queue with objects to be retrieved. + * @throws IOException */ - public synchronized void startRetrievingObjects() + public synchronized void startRetrievingObjects() throws IOException { if (DEBUG) Log.d(TAG, "Start retrieving objects"); @@ -112,8 +121,9 @@ public class TelemetryMonitor extends Observable{ /** * Retrieve the next object in the queue + * @throws IOException */ - public synchronized void retrieveNextObject() + public synchronized void retrieveNextObject() throws IOException { // If queue is empty return if ( queue.isEmpty() ) @@ -133,11 +143,18 @@ public class TelemetryMonitor extends Observable{ if (DEBUG) Log.d(TAG, "Retrieving object: " + obj.getName()) ; // Connect to object + + // TODO: Does this need to stay here permanently? This appears to be used for setup mainly obj.addTransactionCompleted(new Observer() { public void update(Observable observable, Object data) { UAVObject.TransactionResult result = (UAVObject.TransactionResult) data; if (DEBUG) Log.d(TAG,"Got transaction completed event from " + result.obj.getName() + " status: " + result.success); - transactionCompleted(result.obj, result.success); + try { + transactionCompleted(result.obj, result.success); + } catch (IOException e) { + // When the telemetry stream is broken disconnect these updates + observable.deleteObserver(this); + } } }); @@ -148,8 +165,9 @@ public class TelemetryMonitor extends Observable{ /** * Called by the retrieved object when a transaction is completed. + * @throws IOException */ - public synchronized void transactionCompleted(UAVObject obj, boolean success) + public synchronized void transactionCompleted(UAVObject obj, boolean success) throws IOException { //QMutexLocker locker(mutex); // Disconnect from sending object @@ -176,8 +194,9 @@ public class TelemetryMonitor extends Observable{ /** * Called each time the flight stats object is updated by the autopilot + * @throws IOException */ - public synchronized void flightStatsUpdated(UAVObject obj) + public synchronized void flightStatsUpdated(UAVObject obj) throws IOException { // Force update if not yet connected gcsStatsObj = objMngr.getObject("GCSTelemetryStats"); @@ -193,8 +212,9 @@ public class TelemetryMonitor extends Observable{ /** * Called periodically to update the statistics and connection status. + * @throws IOException */ - public synchronized void processStatsUpdates() + public synchronized void processStatsUpdates() throws IOException { // Get telemetry stats if (DEBUG) Log.d(TAG, "processStatsUpdates()"); @@ -313,9 +333,20 @@ public class TelemetryMonitor extends Observable{ periodicTask.scheduleAtFixedRate(new TimerTask() { @Override public void run() { - processStatsUpdates(); + try { + processStatsUpdates(); + } catch (IOException e) { + // Once the stream has died stop trying to process these updates + periodicTask.cancel(); + } } }, currentPeriod, currentPeriod); } + + public void stopMonitor() + { + periodicTask.cancel(); + periodicTask = null; + } } diff --git a/androidgcs/src/org/openpilot/uavtalk/UAVObject.java b/androidgcs/src/org/openpilot/uavtalk/UAVObject.java index cf195799f..39769b29f 100644 --- a/androidgcs/src/org/openpilot/uavtalk/UAVObject.java +++ b/androidgcs/src/org/openpilot/uavtalk/UAVObject.java @@ -45,6 +45,11 @@ public abstract class UAVObject { transactionCompletedListeners.addObserver(o); } } + public void removeTransactionCompleted(Observer o) { + synchronized(transactionCompletedListeners) { + transactionCompletedListeners.deleteObserver(o); + } + } void transactionCompleted(boolean status) { synchronized(transactionCompletedListeners) { transactionCompletedListeners.event(new TransactionResult(this,status)); diff --git a/androidgcs/src/org/openpilot/uavtalk/UAVTalk.java b/androidgcs/src/org/openpilot/uavtalk/UAVTalk.java index e75724c08..003f7bccc 100644 --- a/androidgcs/src/org/openpilot/uavtalk/UAVTalk.java +++ b/androidgcs/src/org/openpilot/uavtalk/UAVTalk.java @@ -17,11 +17,12 @@ public class UAVTalk extends Observable { public static boolean DEBUG = LOGLEVEL > 0; private Thread inputProcessingThread = null; + + private boolean streamFailed = false; /** - * A reference to the thread for processing the incoming stream - * - * @return + * A reference to the thread for processing the incoming stream. Currently this method is ONLY + * used for unit testing */ public Thread getInputProcessThread() { if (inputProcessingThread == null) @@ -29,8 +30,13 @@ public class UAVTalk extends Observable { inputProcessingThread = new Thread() { public void run() { while(true) { - if( !processInputStream() ) - break; + try { + if( !processInputStream() ) + break; + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } } } }; @@ -175,18 +181,14 @@ public class UAVTalk extends Observable { /** * Process any data in the queue + * @throws IOException */ - public boolean processInputStream() { + public boolean processInputStream() throws IOException { int val; - try { - // inStream.wait(); - val = inStream.read(); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - return false; - } + //inStream.wait(); + val = inStream.read(); + if (val == -1) { return false; } @@ -201,8 +203,9 @@ public class UAVTalk extends Observable { * would have been updated by the GCS. \param[in] obj Object to update * \param[in] allInstances If set true then all instances will be updated * \return Success (true), Failure (false) + * @throws IOException */ - public boolean sendObjectRequest(UAVObject obj, boolean allInstances) { + public boolean sendObjectRequest(UAVObject obj, boolean allInstances) throws IOException { // QMutexLocker locker(mutex); return objectTransaction(obj, TYPE_OBJ_REQ, allInstances); } @@ -212,9 +215,10 @@ public class UAVTalk extends Observable { * Object to send \param[in] acked Selects if an ack is required \param[in] * allInstances If set true then all instances will be updated \return * Success (true), Failure (false) + * @throws IOException */ public synchronized boolean sendObject(UAVObject obj, boolean acked, - boolean allInstances) { + boolean allInstances) throws IOException { if (acked) { return objectTransaction(obj, TYPE_OBJ_ACK, allInstances); } else { @@ -235,9 +239,10 @@ public class UAVTalk extends Observable { * request object update TYPE_OBJ_ACK: send object with an ack \param[in] * allInstances If set true then all instances will be updated \return * Success (true), Failure (false) + * @throws IOException */ public boolean objectTransaction(UAVObject obj, int type, - boolean allInstances) { + boolean allInstances) throws IOException { // Send object depending on if a response is needed if (type == TYPE_OBJ_ACK || type == TYPE_OBJ_REQ) { if (transmitObject(obj, type, allInstances)) { @@ -257,8 +262,9 @@ public class UAVTalk extends Observable { /** * Process an byte from the telemetry stream. \param[in] rxbyte Received * byte \return Success (true), Failure (false) + * @throws IOException */ - public synchronized boolean processInputByte(int rxbyte) { + public synchronized boolean processInputByte(int rxbyte) throws IOException { assert (objMngr != null); // Update stats @@ -471,9 +477,10 @@ public class UAVTalk extends Observable { * received object \param[in] instId The instance ID of UAVOBJ_ALL_INSTANCES * for all instances. \param[in] data Data buffer \param[in] length Buffer * length \return Success (true), Failure (false) + * @throws IOException */ public boolean receiveObject(int type, long objId, long instId, - ByteBuffer data) { + ByteBuffer data) throws IOException { if (DEBUG) Log.d(TAG, "Received object ID: " + objId); assert (objMngr != null); @@ -622,8 +629,9 @@ public class UAVTalk extends Observable { * @param[in] type Transaction type * @param[in] allInstances True is all instances of the object are to be sent * @return Success (true), Failure (false) + * @throws IOException */ - public synchronized boolean transmitObject(UAVObject obj, int type, boolean allInstances) { + public synchronized boolean transmitObject(UAVObject obj, int type, boolean allInstances) throws IOException { // If all instances are requested on a single instance object it is an // error if (allInstances && obj.isSingleInstance()) { @@ -658,12 +666,13 @@ public class UAVTalk extends Observable { } /** - * Send an object through the telemetry link. \param[in] obj Object handle - * to send \param[in] type Transaction type \return Success (true), Failure - * (false) + * Send an object through the telemetry link. + * @throws IOException + * @param[in] obj Object handle to send + * @param[in] type Transaction type \return Success (true), Failure (false) */ public synchronized boolean transmitSingleObject(UAVObject obj, int type, - boolean allInstances) { + boolean allInstances) throws IOException { int length; int allInstId = ALL_INSTANCES; @@ -682,8 +691,7 @@ public class UAVTalk extends Observable { // Setup type and object id fields bbuf.put((byte) (SYNC_VAL & 0xff)); bbuf.put((byte) (type & 0xff)); - bbuf - .putShort((short) (length + 2 /* SYNC, Type */+ 2 /* Size */+ 4 /* ObjID */+ (obj + bbuf.putShort((short) (length + 2 /* SYNC, Type */+ 2 /* Size */+ 4 /* ObjID */+ (obj .isSingleInstance() ? 0 : 2))); bbuf.putInt((int)obj.getObjID()); @@ -714,31 +722,12 @@ public class UAVTalk extends Observable { // Calculate checksum bbuf.put((byte) (updateCRC(0, bbuf.array(), bbuf.position()) & 0xff)); - try { - int packlen = bbuf.position(); - bbuf.position(0); - byte[] dst = new byte[packlen]; - bbuf.get(dst, 0, packlen); - outStream.write(dst); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - return false; - } + int packlen = bbuf.position(); + bbuf.position(0); + byte[] dst = new byte[packlen]; + bbuf.get(dst, 0, packlen); + outStream.write(dst); - // //TODO: Need to use a different outStream type and check that the - // backlog isn't more than TX_BUFFER_SIZE - // // Send buffer, check that the transmit backlog does not grow above - // limit - // if ( io->bytesToWrite() < TX_BUFFER_SIZE ) - // { - // io->write((const char*)txBuffer, dataOffset+length+CHECKSUM_LENGTH); - // } - // else - // { - // ++stats.txErrors; - // return false; - // } // Update stats ++stats.txObjects; diff --git a/androidgcs/tests/org/openpilot/uavtalk/TalkTest.java b/androidgcs/tests/org/openpilot/uavtalk/TalkTest.java index 91f3aa198..1b66d9660 100644 --- a/androidgcs/tests/org/openpilot/uavtalk/TalkTest.java +++ b/androidgcs/tests/org/openpilot/uavtalk/TalkTest.java @@ -90,7 +90,7 @@ public class TalkTest { } @Test - public void testSendObjectRequest() { + public void testSendObjectRequest() throws IOException { ByteArrayInputStream is = new ByteArrayInputStream(new byte[0], 0, 0); ByteArrayOutputStream os = new ByteArrayOutputStream(100);