1
0
mirror of https://bitbucket.org/librepilot/librepilot.git synced 2025-02-20 10:54:14 +01:00

AndroidGCS: Propagate the exceptions from the low level UAVTalk methods up to

the high level monitoring so it knows when the connection has been terminated.
This commit is contained in:
James Cotton 2012-08-05 21:52:09 -05:00
parent 4c1171e1fb
commit 5670a958a8
6 changed files with 196 additions and 81 deletions

View File

@ -24,13 +24,19 @@ public class TcpUAVTalk {
private UAVTalk uavTalk;
private boolean connected;
private Socket socket;
/**
* Construct a TcpUAVTalk object attached to the OPTelemetryService. Gets the
* connection settings from the preferences.
*/
public TcpUAVTalk(Context caller) {
SharedPreferences prefs = PreferenceManager.getDefaultSharedPreferences(caller);
ip_address = prefs.getString("ip_address","127.0.0.1");
try {
port = Integer.decode(prefs.getString("port", ""));
} catch (NumberFormatException e) {
//TODO: Handle this exception
}
if (DEBUG) Log.d(TAG, "Trying to open UAVTalk with " + ip_address);
@ -38,6 +44,10 @@ public class TcpUAVTalk {
connected = false;
}
/**
* Connect a TCP object to an object manager. Returns true if already
* connected, otherwise returns true if managed a successful socket.
*/
public boolean connect(UAVObjectManager objMngr) {
if( getConnected() )
return true;
@ -45,6 +55,16 @@ public class TcpUAVTalk {
return false;
return true;
}
public void disconnect() {
try {
socket.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
socket = null;
}
public boolean getConnected() {
return connected;
@ -54,7 +74,10 @@ public class TcpUAVTalk {
return uavTalk;
}
/**
* Opens a TCP socket to the address determined on construction. If successful
* creates a UAVTalk stream connection this socket to the passed in object manager
*/
private boolean openTelemetryTcp(UAVObjectManager objMngr) {
Log.d(TAG, "Opening connection to " + ip_address + " at address " + port);
@ -66,13 +89,11 @@ public class TcpUAVTalk {
e1.printStackTrace();
return false;
}
Socket socket = null;
socket = null;
try {
socket = new Socket(serverAddr,port);
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
return false;
}

View File

@ -1,5 +1,6 @@
package org.openpilot.uavtalk;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@ -88,7 +89,12 @@ public class Telemetry {
// Listen to transaction completions
utalk.addObserver(new Observer() {
public void update(Observable observable, Object data) {
transactionCompleted((UAVObject) data);
try {
transactionCompleted((UAVObject) data);
} catch (IOException e) {
// Disconnect when stream fails
observable.deleteObserver(this);
}
}
});
@ -117,18 +123,35 @@ public class Telemetry {
transTimerTask = new TimerTask() {
@Override
public void run() {
transactionTimeout();
try {
transactionTimeout();
} catch (IOException e) {
cancel();
}
}
};
transTimer.schedule(transTimerTask, periodMs, periodMs);
}
synchronized void updateTimerSetPeriod(int periodMs) {
if (updateTimer != null) {
updateTimer.cancel();
updateTimer = null;
}
if (updateTimerTask != null) {
updateTimerTask.cancel();
updateTimerTask = null;
}
updateTimer = new Timer();
updateTimerTask = new TimerTask() {
@Override
public void run() {
processPeriodicUpdates();
try {
processPeriodicUpdates();
} catch (IOException e) {
updateTimerTask.cancel();
updateTimer.cancel();
}
}
};
updateTimer.schedule(updateTimerTask, periodMs, periodMs);
@ -206,7 +229,12 @@ public class Telemetry {
{
obj.addUnpackedObserver(new Observer() {
public void update(Observable observable, Object data) {
objectUnpacked( (UAVObject) data);
try {
objectUnpacked( (UAVObject) data);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
@ -214,7 +242,12 @@ public class Telemetry {
{
obj.addUpdatedAutoObserver(new Observer() {
public void update(Observable observable, Object data) {
objectUpdatedAuto( (UAVObject) data);
try {
objectUpdatedAuto( (UAVObject) data);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
@ -222,7 +255,12 @@ public class Telemetry {
{
obj.addUpdatedManualObserver(new Observer() {
public void update(Observable observable, Object data) {
objectUpdatedManual( (UAVObject) data);
try {
objectUpdatedManual( (UAVObject) data);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
@ -230,7 +268,12 @@ public class Telemetry {
{
obj.addUpdateRequestedObserver(new Observer() {
public void update(Observable observable, Object data) {
updateRequested( (UAVObject) data);
try {
updateRequested( (UAVObject) data);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
@ -288,8 +331,9 @@ public class Telemetry {
/**
* Called when a transaction is successfully completed (uavtalk event)
* @throws IOException
*/
private synchronized void transactionCompleted(UAVObject obj)
private synchronized void transactionCompleted(UAVObject obj) throws IOException
{
if (DEBUG) Log.d(TAG,"UAVTalk transactionCompleted");
// Check if there is a pending transaction and the objects match
@ -311,8 +355,9 @@ public class Telemetry {
/**
* Called when a transaction is not completed within the timeout period (timer event)
* @throws IOException
*/
private synchronized void transactionTimeout()
private synchronized void transactionTimeout() throws IOException
{
if (DEBUG) Log.d(TAG,"Telemetry: transaction timeout.");
transTimer.cancel();
@ -342,8 +387,9 @@ public class Telemetry {
/**
* Start an object transaction with UAVTalk, all information is stored in transInfo
* @throws IOException
*/
private synchronized void processObjectTransaction()
private synchronized void processObjectTransaction() throws IOException
{
if (transPending)
{
@ -375,8 +421,9 @@ public class Telemetry {
/**
* Process the event received from an object
* @throws IOException
*/
private synchronized void processObjectUpdates(UAVObject obj, int event, boolean allInstances, boolean priority)
private synchronized void processObjectUpdates(UAVObject obj, int event, boolean allInstances, boolean priority) throws IOException
{
// Push event into queue
if (DEBUG) Log.d(TAG, "Push event into queue for obj " + obj.getName() + " event " + event);
@ -397,6 +444,7 @@ public class Telemetry {
++txErrors;
obj.transactionCompleted(false);
Log.w(TAG,"Telemetry: priority event queue is full, event lost " + obj.getName());
new Exception().printStackTrace();
}
}
else
@ -421,8 +469,9 @@ public class Telemetry {
/**
* Process events from the object queue
* @throws IOException
*/
private synchronized void processObjectQueue()
private synchronized void processObjectQueue() throws IOException
{
if (DEBUG) Log.d(TAG, "Process object queue - Depth " + objQueue.size() + " priority " + objPriorityQueue.size());
@ -505,8 +554,9 @@ public class Telemetry {
/**
* Check is any objects are pending for periodic updates
* TODO: Clean-up
* @throws IOException
*/
private synchronized void processPeriodicUpdates()
private synchronized void processPeriodicUpdates() throws IOException
{
if (DEBUG) Log.d(TAG, "processPeriodicUpdates()");
@ -590,22 +640,22 @@ public class Telemetry {
txRetries = 0;
}
private synchronized void objectUpdatedAuto(UAVObject obj)
private synchronized void objectUpdatedAuto(UAVObject obj) throws IOException
{
processObjectUpdates(obj, EV_UPDATED, false, true);
}
private synchronized void objectUpdatedManual(UAVObject obj)
private synchronized void objectUpdatedManual(UAVObject obj) throws IOException
{
processObjectUpdates(obj, EV_UPDATED_MANUAL, false, true);
}
private synchronized void objectUnpacked(UAVObject obj)
private synchronized void objectUnpacked(UAVObject obj) throws IOException
{
processObjectUpdates(obj, EV_UNPACKED, false, true);
}
public synchronized void updateRequested(UAVObject obj)
public synchronized void updateRequested(UAVObject obj) throws IOException
{
processObjectUpdates(obj, EV_UPDATE_REQ, false, true);
}
@ -620,6 +670,25 @@ public class Telemetry {
registerObject(obj);
}
/**
* Stop all the telemetry timers
*/
public void stopTelemetry()
{
if (updateTimerTask != null)
updateTimerTask.cancel();
updateTimerTask = null;
if (updateTimer != null)
updateTimer.cancel();
updateTimer = null;
if (transTimerTask != null)
transTimerTask.cancel();
transTimerTask = null;
if (transTimer != null)
transTimer.cancel();
transTimer = null;
}
/**
* Private variables
*/

View File

@ -1,5 +1,6 @@
package org.openpilot.uavtalk;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
@ -50,7 +51,14 @@ public class TelemetryMonitor extends Observable{
flightStatsObj.addUpdatedObserver(new Observer() {
public void update(Observable observable, Object data) {
flightStatsUpdated((UAVObject) data);
try {
flightStatsUpdated((UAVObject) data);
} catch (IOException e) {
// The UAVTalk stream was broken, disconnect this signal
// TODO: Should this actually be disconnected. Do we create a new TelemetryMonitor for this
// or fix the stream?
flightStatsObj.removeUpdatedObserver(this);
}
}
});
@ -60,8 +68,9 @@ public class TelemetryMonitor extends Observable{
/**
* Initiate object retrieval, initialize queue with objects to be retrieved.
* @throws IOException
*/
public synchronized void startRetrievingObjects()
public synchronized void startRetrievingObjects() throws IOException
{
if (DEBUG) Log.d(TAG, "Start retrieving objects");
@ -112,8 +121,9 @@ public class TelemetryMonitor extends Observable{
/**
* Retrieve the next object in the queue
* @throws IOException
*/
public synchronized void retrieveNextObject()
public synchronized void retrieveNextObject() throws IOException
{
// If queue is empty return
if ( queue.isEmpty() )
@ -133,11 +143,18 @@ public class TelemetryMonitor extends Observable{
if (DEBUG) Log.d(TAG, "Retrieving object: " + obj.getName()) ;
// Connect to object
// TODO: Does this need to stay here permanently? This appears to be used for setup mainly
obj.addTransactionCompleted(new Observer() {
public void update(Observable observable, Object data) {
UAVObject.TransactionResult result = (UAVObject.TransactionResult) data;
if (DEBUG) Log.d(TAG,"Got transaction completed event from " + result.obj.getName() + " status: " + result.success);
transactionCompleted(result.obj, result.success);
try {
transactionCompleted(result.obj, result.success);
} catch (IOException e) {
// When the telemetry stream is broken disconnect these updates
observable.deleteObserver(this);
}
}
});
@ -148,8 +165,9 @@ public class TelemetryMonitor extends Observable{
/**
* Called by the retrieved object when a transaction is completed.
* @throws IOException
*/
public synchronized void transactionCompleted(UAVObject obj, boolean success)
public synchronized void transactionCompleted(UAVObject obj, boolean success) throws IOException
{
//QMutexLocker locker(mutex);
// Disconnect from sending object
@ -176,8 +194,9 @@ public class TelemetryMonitor extends Observable{
/**
* Called each time the flight stats object is updated by the autopilot
* @throws IOException
*/
public synchronized void flightStatsUpdated(UAVObject obj)
public synchronized void flightStatsUpdated(UAVObject obj) throws IOException
{
// Force update if not yet connected
gcsStatsObj = objMngr.getObject("GCSTelemetryStats");
@ -193,8 +212,9 @@ public class TelemetryMonitor extends Observable{
/**
* Called periodically to update the statistics and connection status.
* @throws IOException
*/
public synchronized void processStatsUpdates()
public synchronized void processStatsUpdates() throws IOException
{
// Get telemetry stats
if (DEBUG) Log.d(TAG, "processStatsUpdates()");
@ -313,9 +333,20 @@ public class TelemetryMonitor extends Observable{
periodicTask.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
processStatsUpdates();
try {
processStatsUpdates();
} catch (IOException e) {
// Once the stream has died stop trying to process these updates
periodicTask.cancel();
}
}
}, currentPeriod, currentPeriod);
}
public void stopMonitor()
{
periodicTask.cancel();
periodicTask = null;
}
}

View File

@ -45,6 +45,11 @@ public abstract class UAVObject {
transactionCompletedListeners.addObserver(o);
}
}
public void removeTransactionCompleted(Observer o) {
synchronized(transactionCompletedListeners) {
transactionCompletedListeners.deleteObserver(o);
}
}
void transactionCompleted(boolean status) {
synchronized(transactionCompletedListeners) {
transactionCompletedListeners.event(new TransactionResult(this,status));

View File

@ -17,11 +17,12 @@ public class UAVTalk extends Observable {
public static boolean DEBUG = LOGLEVEL > 0;
private Thread inputProcessingThread = null;
private boolean streamFailed = false;
/**
* A reference to the thread for processing the incoming stream
*
* @return
* A reference to the thread for processing the incoming stream. Currently this method is ONLY
* used for unit testing
*/
public Thread getInputProcessThread() {
if (inputProcessingThread == null)
@ -29,8 +30,13 @@ public class UAVTalk extends Observable {
inputProcessingThread = new Thread() {
public void run() {
while(true) {
if( !processInputStream() )
break;
try {
if( !processInputStream() )
break;
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
};
@ -175,18 +181,14 @@ public class UAVTalk extends Observable {
/**
* Process any data in the queue
* @throws IOException
*/
public boolean processInputStream() {
public boolean processInputStream() throws IOException {
int val;
try {
// inStream.wait();
val = inStream.read();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return false;
}
//inStream.wait();
val = inStream.read();
if (val == -1) {
return false;
}
@ -201,8 +203,9 @@ public class UAVTalk extends Observable {
* would have been updated by the GCS. \param[in] obj Object to update
* \param[in] allInstances If set true then all instances will be updated
* \return Success (true), Failure (false)
* @throws IOException
*/
public boolean sendObjectRequest(UAVObject obj, boolean allInstances) {
public boolean sendObjectRequest(UAVObject obj, boolean allInstances) throws IOException {
// QMutexLocker locker(mutex);
return objectTransaction(obj, TYPE_OBJ_REQ, allInstances);
}
@ -212,9 +215,10 @@ public class UAVTalk extends Observable {
* Object to send \param[in] acked Selects if an ack is required \param[in]
* allInstances If set true then all instances will be updated \return
* Success (true), Failure (false)
* @throws IOException
*/
public synchronized boolean sendObject(UAVObject obj, boolean acked,
boolean allInstances) {
boolean allInstances) throws IOException {
if (acked) {
return objectTransaction(obj, TYPE_OBJ_ACK, allInstances);
} else {
@ -235,9 +239,10 @@ public class UAVTalk extends Observable {
* request object update TYPE_OBJ_ACK: send object with an ack \param[in]
* allInstances If set true then all instances will be updated \return
* Success (true), Failure (false)
* @throws IOException
*/
public boolean objectTransaction(UAVObject obj, int type,
boolean allInstances) {
boolean allInstances) throws IOException {
// Send object depending on if a response is needed
if (type == TYPE_OBJ_ACK || type == TYPE_OBJ_REQ) {
if (transmitObject(obj, type, allInstances)) {
@ -257,8 +262,9 @@ public class UAVTalk extends Observable {
/**
* Process an byte from the telemetry stream. \param[in] rxbyte Received
* byte \return Success (true), Failure (false)
* @throws IOException
*/
public synchronized boolean processInputByte(int rxbyte) {
public synchronized boolean processInputByte(int rxbyte) throws IOException {
assert (objMngr != null);
// Update stats
@ -471,9 +477,10 @@ public class UAVTalk extends Observable {
* received object \param[in] instId The instance ID of UAVOBJ_ALL_INSTANCES
* for all instances. \param[in] data Data buffer \param[in] length Buffer
* length \return Success (true), Failure (false)
* @throws IOException
*/
public boolean receiveObject(int type, long objId, long instId,
ByteBuffer data) {
ByteBuffer data) throws IOException {
if (DEBUG) Log.d(TAG, "Received object ID: " + objId);
assert (objMngr != null);
@ -622,8 +629,9 @@ public class UAVTalk extends Observable {
* @param[in] type Transaction type
* @param[in] allInstances True is all instances of the object are to be sent
* @return Success (true), Failure (false)
* @throws IOException
*/
public synchronized boolean transmitObject(UAVObject obj, int type, boolean allInstances) {
public synchronized boolean transmitObject(UAVObject obj, int type, boolean allInstances) throws IOException {
// If all instances are requested on a single instance object it is an
// error
if (allInstances && obj.isSingleInstance()) {
@ -658,12 +666,13 @@ public class UAVTalk extends Observable {
}
/**
* Send an object through the telemetry link. \param[in] obj Object handle
* to send \param[in] type Transaction type \return Success (true), Failure
* (false)
* Send an object through the telemetry link.
* @throws IOException
* @param[in] obj Object handle to send
* @param[in] type Transaction type \return Success (true), Failure (false)
*/
public synchronized boolean transmitSingleObject(UAVObject obj, int type,
boolean allInstances) {
boolean allInstances) throws IOException {
int length;
int allInstId = ALL_INSTANCES;
@ -682,8 +691,7 @@ public class UAVTalk extends Observable {
// Setup type and object id fields
bbuf.put((byte) (SYNC_VAL & 0xff));
bbuf.put((byte) (type & 0xff));
bbuf
.putShort((short) (length + 2 /* SYNC, Type */+ 2 /* Size */+ 4 /* ObjID */+ (obj
bbuf.putShort((short) (length + 2 /* SYNC, Type */+ 2 /* Size */+ 4 /* ObjID */+ (obj
.isSingleInstance() ? 0 : 2)));
bbuf.putInt((int)obj.getObjID());
@ -714,31 +722,12 @@ public class UAVTalk extends Observable {
// Calculate checksum
bbuf.put((byte) (updateCRC(0, bbuf.array(), bbuf.position()) & 0xff));
try {
int packlen = bbuf.position();
bbuf.position(0);
byte[] dst = new byte[packlen];
bbuf.get(dst, 0, packlen);
outStream.write(dst);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return false;
}
int packlen = bbuf.position();
bbuf.position(0);
byte[] dst = new byte[packlen];
bbuf.get(dst, 0, packlen);
outStream.write(dst);
// //TODO: Need to use a different outStream type and check that the
// backlog isn't more than TX_BUFFER_SIZE
// // Send buffer, check that the transmit backlog does not grow above
// limit
// if ( io->bytesToWrite() < TX_BUFFER_SIZE )
// {
// io->write((const char*)txBuffer, dataOffset+length+CHECKSUM_LENGTH);
// }
// else
// {
// ++stats.txErrors;
// return false;
// }
// Update stats
++stats.txObjects;

View File

@ -90,7 +90,7 @@ public class TalkTest {
}
@Test
public void testSendObjectRequest() {
public void testSendObjectRequest() throws IOException {
ByteArrayInputStream is = new ByteArrayInputStream(new byte[0], 0, 0);
ByteArrayOutputStream os = new ByteArrayOutputStream(100);