diff --git a/androidgcs/src/org/openpilot/androidgcs/telemetry/HidUAVTalk.java b/androidgcs/src/org/openpilot/androidgcs/telemetry/HidUAVTalk.java index 991e83871..ab170ff48 100644 --- a/androidgcs/src/org/openpilot/androidgcs/telemetry/HidUAVTalk.java +++ b/androidgcs/src/org/openpilot/androidgcs/telemetry/HidUAVTalk.java @@ -28,9 +28,10 @@ import android.util.Log; public class HidUAVTalk extends TelemetryTask { private static final String TAG = HidUAVTalk.class.getSimpleName(); - public static int LOGLEVEL = 0; - public static boolean WARN = LOGLEVEL > 1; - public static boolean DEBUG = LOGLEVEL > 0; + public static final int LOGLEVEL = 1; + public static final boolean DEBUG = LOGLEVEL > 2; + public static final boolean WARN = LOGLEVEL > 1; + public static final boolean ERROR = LOGLEVEL > 0; //! USB constants private static final int MAX_HID_PACKET_SIZE = 64; @@ -46,7 +47,24 @@ public class HidUAVTalk extends TelemetryTask { private static final String ACTION_USB_PERMISSION = "com.access.device.USB_PERMISSION"; - UsbDevice currentDevice; + private UsbDevice currentDevice; + private UsbEndpoint usbEndpointRead; + private UsbEndpoint usbEndpointWrite; + private UsbManager usbManager; + private PendingIntent permissionIntent; + private UsbDeviceConnection usbDeviceConnection; + private IntentFilter permissionFilter; + private UsbInterface usbInterface = null; + private TalkInputStream inTalkStream; + private TalkOutputStream outTalkStream; + private final UsbRequest writeRequest = null; + private UsbRequest readRequest = null; + private Thread readThread; + private Thread writeThread; + + private boolean readPending = false; + private boolean writePending = false; + private IntentFilter deviceAttachedFilter; public HidUAVTalk(OPTelemetryService service) { super(service); @@ -56,15 +74,21 @@ public class HidUAVTalk extends TelemetryTask { public void disconnect() { CleanUpAndClose(); - //hostDisplayActivity.unregisterReceiver(usbReceiver); + telemService.unregisterReceiver(usbReceiver); telemService.unregisterReceiver(usbPermissionReceiver); super.disconnect(); try { - if(readWriteThread != null) { - readWriteThread.join(); - readWriteThread = null; + if(readThread != null) { + readThread.interrupt(); // Make sure not blocking for data + readThread.join(); + readThread = null; + } + if(writeThread != null) { + writeThread.interrupt(); + writeThread.join(); + writeThread = null; } } catch (InterruptedException e) { e.printStackTrace(); @@ -75,13 +99,6 @@ public class HidUAVTalk extends TelemetryTask { readRequest.close(); readRequest = null; } - - if (writeRequest != null) { - writeRequest.cancel(); - writeRequest.close(); - writeRequest = null; - } - } @Override @@ -94,6 +111,11 @@ public class HidUAVTalk extends TelemetryTask { permissionFilter = new IntentFilter(ACTION_USB_PERMISSION); telemService.registerReceiver(usbPermissionReceiver, permissionFilter); + deviceAttachedFilter = new IntentFilter(); + deviceAttachedFilter.addAction(UsbManager.ACTION_USB_DEVICE_ATTACHED); + deviceAttachedFilter.addAction(UsbManager.ACTION_USB_DEVICE_DETACHED); + telemService.registerReceiver(usbReceiver, deviceAttachedFilter); + // Go through all the devices plugged in HashMap deviceList = usbManager.getDeviceList(); if (DEBUG) Log.d(TAG, "Found " + deviceList.size() + " devices"); @@ -160,7 +182,6 @@ public class HidUAVTalk extends TelemetryTask { } }; - /* TODO: Detect dettached events and close the connection private final BroadcastReceiver usbReceiver = new BroadcastReceiver() { @Override @@ -179,8 +200,10 @@ public class HidUAVTalk extends TelemetryTask { { if (device.equals(currentDevice)) { + telemService.toastMessage("Device unplugged while in use"); + if (DEBUG) Log.d(TAG, "Matching device disconnected"); // call your method that cleans up and closes communication with the device - CleanUpAndClose(); + disconnect(); } } } @@ -197,36 +220,13 @@ public class HidUAVTalk extends TelemetryTask { } } } - }; */ + }; - private UsbEndpoint usbEndpointRead; - - private UsbEndpoint usbEndpointWrite; - - private UsbManager usbManager; - - private PendingIntent permissionIntent; - - private UsbDeviceConnection connectionRead; - - private UsbDeviceConnection connectionWrite; - - private IntentFilter permissionFilter; protected void CleanUpAndClose() { - if (UsingSingleInterface) { - if(connectionRead != null && usbInterfaceRead != null) - connectionRead.releaseInterface(usbInterfaceRead); - usbInterfaceRead = null; - } - else { - if(connectionRead != null && usbInterfaceRead != null) - connectionRead.releaseInterface(usbInterfaceRead); - if(connectionWrite != null && usbInterfaceWrite != null) - connectionWrite.releaseInterface(usbInterfaceWrite); - usbInterfaceWrite = null; - usbInterfaceRead = null; - } + if(usbDeviceConnection != null && usbInterface != null) + usbDeviceConnection.releaseInterface(usbInterface); + usbInterface = null; } //Validating the Connected Device - Before asking for permission to connect to the device, it is essential that you ensure that this is a device that you support or expect to connect to. This can be done by validating the devices Vendor ID and Product ID. @@ -244,14 +244,6 @@ public class HidUAVTalk extends TelemetryTask { return false; } - private UsbInterface usbInterfaceRead = null; - private UsbInterface usbInterfaceWrite = null; - private final boolean UsingSingleInterface = true; - - private TalkInputStream inTalkStream; - private TalkOutputStream outTalkStream; - UsbRequest writeRequest = null; - boolean ConnectToDeviceInterface(UsbDevice connectDevice) { // Connecting to the Device - If you are reading and writing, then the device // can either have two end points on a single interface, or two interfaces @@ -262,87 +254,51 @@ public class HidUAVTalk extends TelemetryTask { UsbEndpoint ep1 = null; UsbEndpoint ep2 = null; - - if (UsingSingleInterface) + // Using the same interface for reading and writing + usbInterface = connectDevice.getInterface(0x2); + if (usbInterface.getEndpointCount() == 2) { - // Using the same interface for reading and writing - usbInterfaceRead = connectDevice.getInterface(0x2); - usbInterfaceWrite = usbInterfaceRead; - if (usbInterfaceRead.getEndpointCount() == 2) - { - ep1 = usbInterfaceRead.getEndpoint(0); - ep2 = usbInterfaceRead.getEndpoint(1); - } + ep1 = usbInterface.getEndpoint(0); + ep2 = usbInterface.getEndpoint(1); } - else // if (!UsingSingleInterface) - { - usbInterfaceRead = connectDevice.getInterface(0x01); - usbInterfaceWrite = connectDevice.getInterface(0x02); - if ((usbInterfaceRead.getEndpointCount() == 1) && (usbInterfaceWrite.getEndpointCount() == 1)) - { - ep1 = usbInterfaceRead.getEndpoint(0); - ep2 = usbInterfaceWrite.getEndpoint(0); - } - } - if ((ep1 == null) || (ep2 == null)) { - if (DEBUG) Log.d(TAG, "Null endpoints"); + if (ERROR) Log.e(TAG, "Null endpoints"); return false; } // Determine which endpoint is the read, and which is the write - if (ep1.getType() == UsbConstants.USB_ENDPOINT_XFER_INT) { if (ep1.getDirection() == UsbConstants.USB_DIR_IN) - { usbEndpointRead = ep1; - } else if (ep1.getDirection() == UsbConstants.USB_DIR_OUT) - { usbEndpointWrite = ep1; - } } if (ep2.getType() == UsbConstants.USB_ENDPOINT_XFER_INT) { if (ep2.getDirection() == UsbConstants.USB_DIR_IN) - { usbEndpointRead = ep2; - } else if (ep2.getDirection() == UsbConstants.USB_DIR_OUT) - { usbEndpointWrite = ep2; - } } if ((usbEndpointRead == null) || (usbEndpointWrite == null)) { - if (DEBUG) Log.d(TAG, "Endpoints wrong way around"); + if (ERROR) Log.e(TAG, "Could not find write and read endpoint"); return false; } - connectionRead = usbManager.openDevice(connectDevice); - connectionRead.claimInterface(usbInterfaceRead, true); + // Claim the interface + usbDeviceConnection = usbManager.openDevice(connectDevice); + usbDeviceConnection.claimInterface(usbInterface, true); - if (UsingSingleInterface) - { - connectionWrite = connectionRead; - } - else // if (!UsingSingleInterface) - { - connectionWrite = usbManager.openDevice(connectDevice); - connectionWrite.claimInterface(usbInterfaceWrite, true); - } if (DEBUG) Log.d(TAG, "Opened endpoints"); // Create the USB requests readRequest = new UsbRequest(); - readRequest.initialize(connectionRead, usbEndpointRead); - - writeRequest = new UsbRequest(); - writeRequest.initialize(connectionWrite, usbEndpointWrite); + readRequest.initialize(usbDeviceConnection, usbEndpointRead); inTalkStream = new TalkInputStream(); outTalkStream = new TalkOutputStream(); @@ -355,22 +311,48 @@ public class HidUAVTalk extends TelemetryTask { } }); - readWriteThread = new Thread(new Runnable() { + readThread = new Thread(new Runnable() { @Override public void run() { + // Enqueue the first read + queueRead(); while (!shutdown) { - readData(); - sendData(); + UsbRequest returned = usbDeviceConnection.requestWait(); + if (returned == readRequest) { + if (DEBUG) Log.d(TAG, "Received read request"); + readData(); + } else { + Log.e(TAG, "Received unknown USB response"); + break; + } } } - }, "HID Read Write"); - readWriteThread.start(); + }, "HID Read"); + readThread.start(); + + writeThread = new Thread(new Runnable() { + @Override + public void run() { + if (DEBUG) Log.d(TAG, "Starting HID write thread"); + while(!shutdown) { + try { + if (sendDataSynchronous() == false) + break; + } catch (InterruptedException e) { + break; + } + } + if (DEBUG) Log.d(TAG, "Ending HID write thread"); + } + }, "HID Write"); + writeThread.start(); + + telemService.toastMessage("HID Device Opened"); return true; } - Thread readWriteThread; void displayBuffer(String msg, byte[] buf) { msg += " ("; @@ -384,71 +366,100 @@ public class HidUAVTalk extends TelemetryTask { * Gets a report from HID, extract the meaningful data and push * it to the input stream */ - UsbRequest readRequest = null; - public int readData() { - int bufferDataLength = usbEndpointRead.getMaxPacketSize(); - ByteBuffer buffer = ByteBuffer.allocate(bufferDataLength + 1); + ByteBuffer readBuffer = ByteBuffer.allocate(MAX_HID_PACKET_SIZE); - // queue a request on the interrupt endpoint - if(!readRequest.queue(buffer, bufferDataLength)) { - if (DEBUG) Log.d(TAG, "Failed to queue request"); - return 0; - } - - if (DEBUG) Log.d(TAG, "Request queued"); - - int dataSize; - // wait for status event - if (connectionRead.requestWait() == readRequest) { - // Packet format: - // 0: Report ID (1) - // 1: Number of valid bytes - // 2:63: Data - - dataSize = buffer.get(1); // Data size - //Assert.assertEquals(1, buffer.get()); // Report ID - //Assert.assertTrue(dataSize < buffer.capacity()); - - if (buffer.get(0) != 1 || buffer.get(1) < 0 || buffer.get(1) > (buffer.capacity() - 2)) { - if (DEBUG) Log.d(TAG, "Badly formatted HID packet"); - } else { - byte[] dst = new byte[dataSize]; - buffer.position(2); - buffer.get(dst, 0, dataSize); - if (DEBUG) Log.d(TAG, "Entered read"); - inTalkStream.write(dst); - if (DEBUG) Log.d(TAG, "Got read: " + dataSize + " bytes"); - } - } else - return 0; - - return dataSize; + /** + * Schedules a USB read + */ + private void queueRead() { + synchronized(readRequest) { + if(!readRequest.queue(readBuffer, MAX_HID_PACKET_SIZE)) { + if (ERROR) Log.e(TAG, "Failed to queue request"); + } else + readPending = true; + } } + /** + * Reads data from the last USB transaction and schedules another read + */ + public void readData() { + synchronized(readRequest) { + + if (!readPending) { + if (ERROR) Log.e(TAG, "Tried to read read while a transaction was not pending"); + return; + } + + // We just received a read + readPending = false; + // Packet format: + // 0: Report ID (1) + // 1: Number of valid bytes + // 2:63: Data + + int dataSize = readBuffer.get(1); // Data size + //Assert.assertEquals(1, buffer.get()); // Report ID + //Assert.assertTrue(dataSize < buffer.capacity()); + + if (readBuffer.get(0) != 1 || readBuffer.get(1) < 0 || readBuffer.get(1) > (readBuffer.capacity() - 2)) { + if (ERROR) Log.e(TAG, "Badly formatted HID packet"); + } else { + byte[] dst = new byte[dataSize]; + readBuffer.position(2); + readBuffer.get(dst, 0, dataSize); + if (DEBUG) Log.d(TAG, "Entered read"); + inTalkStream.write(dst); + if (DEBUG) Log.d(TAG, "Got read: " + dataSize + " bytes"); + } + + // Queue another read + queueRead(); + + } + } + + /** * Send a packet if data is available */ public void sendData() { - ByteBuffer packet = null; - do { // Send all the data available to prevent sending backlog - packet = outTalkStream.getHIDpacket(); + synchronized(writeRequest) { + // Don't try and send data till previous request completes + if (writePending) + return; + + ByteBuffer packet = outTalkStream.getHIDpacket(); if (packet != null) { if (DEBUG) Log.d(TAG, "Writing to device()"); int bufferDataLength = usbEndpointWrite.getMaxPacketSize(); Assert.assertTrue(packet.capacity() <= bufferDataLength); - writeRequest.queue(packet, bufferDataLength); - try - { - if (!writeRequest.equals(connectionWrite.requestWait())) - Log.e(TAG, "writeRequest failed"); - } - catch (Exception ex) - { - } + if(writeRequest.queue(packet, bufferDataLength)) + writePending = true; + else if (ERROR) + Log.e(TAG, "Write queuing failed"); } - } while (packet != null); + } + } + + /** + * Send a packet if data is available + * @throws InterruptedException + */ + public boolean sendDataSynchronous() throws InterruptedException { + + ByteBuffer packet = outTalkStream.getHIDpacketBlocking(); + if (packet != null) { + if (DEBUG) Log.d(TAG, "sendDataSynchronous() Writing to device()"); + + if (usbDeviceConnection.bulkTransfer(usbEndpointWrite, packet.array(), MAX_HID_PACKET_SIZE, 1000) < 0) { + Log.e(TAG, "Failed to perform bulk write"); + return false; + } + } + return true; } /*********** Helper classes for telemetry streams ************/ @@ -458,6 +469,20 @@ public class HidUAVTalk extends TelemetryTask { // and ByteFifo.put(byte []) ByteFifo data = new ByteFifo(); + /** + * Blocks until data is available and then returns a properly formatted HID packet + */ + public ByteBuffer getHIDpacketBlocking() throws InterruptedException { + synchronized(data) { + if (data.remaining() == 0) + data.wait(); + return getHIDpacket(); + } + } + + /** + * Gets data from the ByteFifo in a properly formatted HID packet + */ public ByteBuffer getHIDpacket() { ByteBuffer packet = null; synchronized(data) { diff --git a/androidgcs/src/org/openpilot/androidgcs/telemetry/OPTelemetryService.java b/androidgcs/src/org/openpilot/androidgcs/telemetry/OPTelemetryService.java index 4eeb57abe..4698900ef 100644 --- a/androidgcs/src/org/openpilot/androidgcs/telemetry/OPTelemetryService.java +++ b/androidgcs/src/org/openpilot/androidgcs/telemetry/OPTelemetryService.java @@ -26,16 +26,10 @@ */ package org.openpilot.androidgcs.telemetry; -import java.io.IOException; import java.lang.ref.WeakReference; -import java.util.Observable; -import java.util.Observer; -import org.openpilot.uavtalk.Telemetry; -import org.openpilot.uavtalk.TelemetryMonitor; import org.openpilot.uavtalk.UAVDataObject; import org.openpilot.uavtalk.UAVObjectManager; -import org.openpilot.uavtalk.UAVTalk; import org.openpilot.uavtalk.uavobjects.UAVObjectsInitialize; import android.app.Service; @@ -127,7 +121,7 @@ public class OPTelemetryService extends Service { break; case 2: Toast.makeText(getApplicationContext(), "Attempting BT connection", Toast.LENGTH_SHORT).show(); - activeTelem = new BTTelemetryThread(); + //activeTelem = new BTTelemetryThread(); break; case 3: Toast.makeText(getApplicationContext(), "Attempting TCP connection", Toast.LENGTH_SHORT).show(); @@ -347,6 +341,8 @@ public class OPTelemetryService extends Service { } } } + + /* private class BTTelemetryThread extends Thread implements TelemTask { private final UAVObjectManager objMngr; @@ -401,7 +397,7 @@ public class OPTelemetryService extends Service { @Override public void update(Observable arg0, Object arg1) { if (DEBUG) Log.d(TAG, "Mon updated. Connected: " + mon.getConnected() + " objects updated: " + mon.getObjectsUpdated()); - if(mon.getConnected() /*&& mon.getObjectsUpdated()*/) { + if(mon.getConnected() ) { Intent intent = new Intent(); intent.setAction(INTENT_ACTION_CONNECTED); sendBroadcast(intent,null); @@ -423,6 +419,5 @@ public class OPTelemetryService extends Service { } if (DEBUG) Log.d(TAG, "UAVTalk stream disconnected"); } - - }; + };*/ } diff --git a/androidgcs/src/org/openpilot/androidgcs/telemetry/TelemetryTask.java b/androidgcs/src/org/openpilot/androidgcs/telemetry/TelemetryTask.java index 87196bc1e..3d16d94f9 100644 --- a/androidgcs/src/org/openpilot/androidgcs/telemetry/TelemetryTask.java +++ b/androidgcs/src/org/openpilot/androidgcs/telemetry/TelemetryTask.java @@ -105,8 +105,8 @@ public abstract class TelemetryTask implements Runnable { // Create the required telemetry objects attached to this // data stream uavTalk = new UAVTalk(inStream, outStream, objMngr); - tel = new Telemetry(uavTalk, objMngr); - mon = new TelemetryMonitor(objMngr,tel); + tel = new Telemetry(uavTalk, objMngr, Looper.myLooper()); + mon = new TelemetryMonitor(objMngr,tel, telemService); // Create an observer to notify system of connection mon.addObserver(connectionObserver); diff --git a/androidgcs/src/org/openpilot/uavtalk/Telemetry.java b/androidgcs/src/org/openpilot/uavtalk/Telemetry.java index f130c33eb..6fcbb2491 100644 --- a/androidgcs/src/org/openpilot/uavtalk/Telemetry.java +++ b/androidgcs/src/org/openpilot/uavtalk/Telemetry.java @@ -35,166 +35,156 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentLinkedQueue; +import junit.framework.Assert; +import android.os.Handler; +import android.os.Looper; import android.util.Log; public class Telemetry { + /** + * Telemetry provides a messaging handler to handle all the object updates + * and transfer requests. This handler can either be attached to a new loop + * attached to the thread started by the telemetry service. + */ private final String TAG = "Telemetry"; - public static int LOGLEVEL = 1; - public static boolean WARN = LOGLEVEL > 2; - public static boolean DEBUG = LOGLEVEL > 1; + public static int LOGLEVEL = 0; + public static boolean DEBUG = LOGLEVEL > 2; + public static boolean WARN = LOGLEVEL > 1; public static boolean ERROR = LOGLEVEL > 0; - public class TelemetryStats { - public int txBytes; - public int rxBytes; - public int txObjectBytes; - public int rxObjectBytes; - public int rxObjects; - public int txObjects; - public int txErrors; - public int rxErrors; - public int txRetries; - } ; - class ObjectTimeInfo { - UAVObject obj; - int updatePeriodMs; /** Update period in ms or 0 if no periodic updates are needed */ - int timeToNextUpdateMs; /** Time delay to the next update */ - }; + public class TelemetryStats { + public int txBytes; + public int rxBytes; + public int txObjectBytes; + public int rxObjectBytes; + public int rxObjects; + public int txObjects; + public int txErrors; + public int rxErrors; + public int txRetries; + }; - class ObjectQueueInfo { - UAVObject obj; - int event; - boolean allInstances; + class ObjectTimeInfo { + UAVObject obj; + int updatePeriodMs; + /** Update period in ms or 0 if no periodic updates are needed */ + int timeToNextUpdateMs; + /** Time delay to the next update */ + }; - @Override + class ObjectQueueInfo { + UAVObject obj; + int event; + boolean allInstances; + + @Override public boolean equals(Object e) { - try { - ObjectQueueInfo o = (ObjectQueueInfo) e; - return o.obj.getObjID() == obj.getObjID() && o.event == event && o.allInstances == allInstances; - } catch (Exception err) { + try { + ObjectQueueInfo o = (ObjectQueueInfo) e; + return o.obj.getObjID() == obj.getObjID() && o.event == event + && o.allInstances == allInstances; + } catch (Exception err) { - }; - return false; - } - }; - - class ObjectTransactionInfo { - UAVObject obj; - boolean allInstances; - boolean objRequest; - int retriesRemaining; - boolean acked; - } ; - - /** - * Events generated by objects. Not enum because used in mask. - */ - private static final int EV_UNPACKED = 0x01; /** Object data updated by unpacking */ - private static final int EV_UPDATED = 0x02; /** Object data updated by changing the data structure */ - private static final int EV_UPDATED_MANUAL = 0x04; /** Object update event manually generated */ - private static final int EV_UPDATE_REQ = 0x08; /** Request to update object data */ - - /** - * Constructor - */ - public Telemetry(UAVTalk utalkIn, UAVObjectManager objMngr) - { - this.utalk = utalkIn; - this.objMngr = objMngr; - - // Process all objects in the list - List< List > objs = objMngr.getObjects(); - ListIterator> li = objs.listIterator(); - while(li.hasNext()) - registerObject(li.next().get(0)); // we only need to register one instance per object type - - // Listen to new object creations - objMngr.addNewInstanceObserver(new Observer() { - @Override - public void update(Observable observable, Object data) { - newInstance((UAVObject) data); } - }); - objMngr.addNewObjectObserver(new Observer() { + ; + return false; + } + }; + + class ObjectTransactionInfo { + UAVObject obj; + boolean allInstances; + boolean objRequest; + int retriesRemaining; + boolean acked; + }; + + /** + * Events generated by objects. Not enum because used in mask. + */ + private static final int EV_UNPACKED = 0x01; + /** Object data updated by unpacking */ + private static final int EV_UPDATED = 0x02; + /** Object data updated by changing the data structure */ + private static final int EV_UPDATED_MANUAL = 0x04; + /** Object update event manually generated */ + private static final int EV_UPDATE_REQ = 0x08; + + /** Request to update object data */ + + /** + * Constructor + */ + public Telemetry(UAVTalk utalkIn, UAVObjectManager objMngr, Looper l) { + this.utalk = utalkIn; + this.objMngr = objMngr; + + // Create a handler for object messages + handler = new ObjectUpdateHandler(l); + + // Process all objects in the list + List> objs = objMngr.getObjects(); + ListIterator> li = objs.listIterator(); + while (li.hasNext()) + registerObject(li.next().get(0)); // we only need to register one + // instance per object type + + // Listen to new object creations + objMngr.addNewInstanceObserver(new Observer() { @Override public void update(Observable observable, Object data) { - newObject((UAVObject) data); - } - }); + newInstance((UAVObject) data); + } + }); + objMngr.addNewObjectObserver(new Observer() { + @Override + public void update(Observable observable, Object data) { + newObject((UAVObject) data); + } + }); - // Listen to transaction completions from uavtalk - utalk.setOnTransactionCompletedListener( - utalk.new OnTransactionCompletedListener() { + // Listen to transaction completions from uavtalk + utalk.setOnTransactionCompletedListener(utalk.new OnTransactionCompletedListener() { @Override void TransactionSucceeded(UAVObject data) { - try { - transactionCompleted(data, true); - } catch (IOException e) { - // Disconnect when stream fails - utalk.setOnTransactionCompletedListener(null); - } - } + transactionCompleted(data, true); + } + @Override void TransactionFailed(UAVObject data) { - try { - if (DEBUG) Log.d(TAG, "TransactionFailed(" + data.getName() + ")"); + if (DEBUG) + Log.d(TAG, "TransactionFailed(" + data.getName() + ")"); - transactionCompleted(data, false); - } catch (IOException e) { - // Disconnect when stream fails - utalk.setOnTransactionCompletedListener(null); - } - } - - }); - - // Get GCS stats object - gcsStatsObj = objMngr.getObject("GCSTelemetryStats"); - - // Setup transaction timer - transPending = false; - // Setup and start the periodic timer - timeToNextUpdateMs = 0; - updateTimerSetPeriod(1000); - // Setup and start the stats timer - txErrors = 0; - txRetries = 0; - } - - synchronized void transTimerSetPeriod(int periodMs) { - if(transTimerTask != null) - transTimerTask.cancel(); - - if(transTimer != null) - transTimer.purge(); - - transTimer = new Timer(); - - transTimerTask = new TimerTask() { - @Override - public void run() { - try { - transactionTimeout(); - } catch (IOException e) { - cancel(); - } + transactionCompleted(data, false); } - }; - transTimer.schedule(transTimerTask, periodMs, periodMs); - } - synchronized void updateTimerSetPeriod(int periodMs) { - if (updateTimer != null) { - updateTimer.cancel(); - updateTimer = null; - } - if (updateTimerTask != null) { - updateTimerTask.cancel(); - updateTimerTask = null; - } - updateTimer = new Timer(); - updateTimerTask = new TimerTask() { + }); + + // Get GCS stats object + gcsStatsObj = objMngr.getObject("GCSTelemetryStats"); + + // Setup transaction timer + transPending = false; + // Setup and start the periodic timer + timeToNextUpdateMs = 0; + updateTimerSetPeriod(1000); + // Setup and start the stats timer + txErrors = 0; + txRetries = 0; + } + + synchronized void updateTimerSetPeriod(int periodMs) { + if (updateTimer != null) { + updateTimer.cancel(); + updateTimer = null; + } + if (updateTimerTask != null) { + updateTimerTask.cancel(); + updateTimerTask = null; + } + updateTimer = new Timer(); + updateTimerTask = new TimerTask() { @Override public void run() { try { @@ -204,574 +194,561 @@ public class Telemetry { updateTimer.cancel(); } } - }; - updateTimer.schedule(updateTimerTask, periodMs, periodMs); + }; + updateTimer.schedule(updateTimerTask, periodMs, periodMs); - } + } - /** - * Register a new object for periodic updates (if enabled) - */ - private synchronized void registerObject(UAVObject obj) - { - // Setup object for periodic updates - addObject(obj); + /** + * Register a new object for periodic updates (if enabled) + */ + private synchronized void registerObject(UAVObject obj) { + // Setup object for periodic updates + addObject(obj); - // Setup object for telemetry updates - updateObject(obj); - } + // Setup object for telemetry updates + updateObject(obj); + } - /** - * Add an object in the list used for periodic updates - */ - private synchronized void addObject(UAVObject obj) - { - // Check if object type is already in the list - ListIterator li = objList.listIterator(); - while(li.hasNext()) { - ObjectTimeInfo n = li.next(); - if( n.obj.getObjID() == obj.getObjID() ) - { - // Object type (not instance!) is already in the list, do nothing - return; - } - } + /** + * Add an object in the list used for periodic updates + */ + private synchronized void addObject(UAVObject obj) { + // Check if object type is already in the list + ListIterator li = objList.listIterator(); + while (li.hasNext()) { + ObjectTimeInfo n = li.next(); + if (n.obj.getObjID() == obj.getObjID()) { + // Object type (not instance!) is already in the list, do + // nothing + return; + } + } - // If this point is reached, then the object type is new, let's add it - ObjectTimeInfo timeInfo = new ObjectTimeInfo(); - timeInfo.obj = obj; - timeInfo.timeToNextUpdateMs = 0; - timeInfo.updatePeriodMs = 0; - objList.add(timeInfo); - } + // If this point is reached, then the object type is new, let's add it + ObjectTimeInfo timeInfo = new ObjectTimeInfo(); + timeInfo.obj = obj; + timeInfo.timeToNextUpdateMs = 0; + timeInfo.updatePeriodMs = 0; + objList.add(timeInfo); + } - /** - * Update the object's timers - */ - private synchronized void setUpdatePeriod(UAVObject obj, int periodMs) - { - // Find object type (not instance!) and update its period - ListIterator li = objList.listIterator(); - while(li.hasNext()) { - ObjectTimeInfo n = li.next(); - if ( n.obj.getObjID() == obj.getObjID() ) - { - n.updatePeriodMs = periodMs; - n.timeToNextUpdateMs = (int) (periodMs * (new java.util.Random()).nextDouble()); // avoid bunching of updates - } - } - } + /** + * Update the object's timers + */ + private synchronized void setUpdatePeriod(UAVObject obj, int periodMs) { + // Find object type (not instance!) and update its period + ListIterator li = objList.listIterator(); + while (li.hasNext()) { + ObjectTimeInfo n = li.next(); + if (n.obj.getObjID() == obj.getObjID()) { + n.updatePeriodMs = periodMs; + n.timeToNextUpdateMs = (int) (periodMs * (new java.util.Random()) + .nextDouble()); // avoid bunching of updates + } + } + } - final Observer unpackedObserver = new Observer() { + final Observer unpackedObserver = new Observer() { @Override public void update(Observable observable, Object data) { - try { - enqueueObjectUpdates((UAVObject) data, EV_UNPACKED, false, true); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } + handler.unpacked((UAVObject) data); + } }; final Observer updatedAutoObserver = new Observer() { @Override public void update(Observable observable, Object data) { - try { - enqueueObjectUpdates((UAVObject) data, EV_UPDATED, false, true); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } + handler.updatedAuto((UAVObject) data); + } }; final Observer updatedManualObserver = new Observer() { @Override public void update(Observable observable, Object data) { - try { - enqueueObjectUpdates((UAVObject) data, EV_UPDATED_MANUAL, false, true); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } + handler.updatedManual((UAVObject) data); + } }; final Observer updatedRequestedObserver = new Observer() { @Override public void update(Observable observable, Object data) { - try { - enqueueObjectUpdates((UAVObject) data, EV_UPDATE_REQ, false, true); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } + handler.updateRequested((UAVObject) data); + } }; - /** - * Connect to all instances of an object depending on the event mask specified - */ - private synchronized void connectToObjectInstances(UAVObject obj, int eventMask) - { - List objs = objMngr.getObjectInstances(obj.getObjID()); - ListIterator li = objs.listIterator(); - while(li.hasNext()) - { - obj = li.next(); + /** + * Connect to all instances of an object depending on the event mask + * specified + */ + private synchronized void connectToObjectInstances(UAVObject obj, + int eventMask) { + List objs = objMngr.getObjectInstances(obj.getObjID()); + ListIterator li = objs.listIterator(); + while (li.hasNext()) { + obj = li.next(); - // Disconnect all previous observers from telemetry. This is imortant as this can - // be called multiple times - obj.removeUnpackedObserver(unpackedObserver); - obj.removeUpdatedAutoObserver(updatedAutoObserver); - obj.removeUpdatedManualObserver(updatedManualObserver); - obj.removeUpdateRequestedObserver(updatedRequestedObserver); + // Disconnect all previous observers from telemetry. This is + // imortant as this can + // be called multiple times + obj.removeUnpackedObserver(unpackedObserver); + obj.removeUpdatedAutoObserver(updatedAutoObserver); + obj.removeUpdatedManualObserver(updatedManualObserver); + obj.removeUpdateRequestedObserver(updatedRequestedObserver); - // Connect only the selected events - if ( (eventMask&EV_UNPACKED) != 0) - obj.addUnpackedObserver(unpackedObserver); - if ( (eventMask&EV_UPDATED) != 0) - obj.addUpdatedAutoObserver(updatedAutoObserver); - if ( (eventMask&EV_UPDATED_MANUAL) != 0) - obj.addUpdatedManualObserver(updatedManualObserver); - if ( (eventMask&EV_UPDATE_REQ) != 0) - obj.addUpdateRequestedObserver(updatedRequestedObserver); - } - } + // Connect only the selected events + if ((eventMask & EV_UNPACKED) != 0) + obj.addUnpackedObserver(unpackedObserver); + if ((eventMask & EV_UPDATED) != 0) + obj.addUpdatedAutoObserver(updatedAutoObserver); + if ((eventMask & EV_UPDATED_MANUAL) != 0) + obj.addUpdatedManualObserver(updatedManualObserver); + if ((eventMask & EV_UPDATE_REQ) != 0) + obj.addUpdateRequestedObserver(updatedRequestedObserver); + } + } - /** - * Update an object based on its metadata properties - */ - private synchronized void updateObject(UAVObject obj) - { - // Get metadata - UAVObject.Metadata metadata = obj.getMetadata(); + /** + * Update an object based on its metadata properties + */ + private void updateObject(UAVObject obj) { + // Get metadata + UAVObject.Metadata metadata = obj.getMetadata(); - // Setup object depending on update mode - int eventMask; - if ( metadata.GetGcsTelemetryUpdateMode() == UAVObject.UpdateMode.UPDATEMODE_PERIODIC ) - { - // Set update period - setUpdatePeriod(obj, metadata.gcsTelemetryUpdatePeriod); - // Connect signals for all instances - eventMask = EV_UPDATED_MANUAL | EV_UPDATE_REQ; - if(obj.isMetadata()) - eventMask |= EV_UNPACKED; // we also need to act on remote updates (unpack events) + // Setup object depending on update mode + int eventMask; + if (metadata.GetGcsTelemetryUpdateMode() == UAVObject.UpdateMode.UPDATEMODE_PERIODIC) { + // Set update period + setUpdatePeriod(obj, metadata.gcsTelemetryUpdatePeriod); + // Connect signals for all instances + eventMask = EV_UPDATED_MANUAL | EV_UPDATE_REQ; + if (obj.isMetadata()) + eventMask |= EV_UNPACKED; // we also need to act on remote + // updates (unpack events) - connectToObjectInstances(obj, eventMask); - } - else if ( metadata.GetGcsTelemetryUpdateMode() == UAVObject.UpdateMode.UPDATEMODE_ONCHANGE ) - { - // Set update period - setUpdatePeriod(obj, 0); - // Connect signals for all instances - eventMask = EV_UPDATED | EV_UPDATED_MANUAL | EV_UPDATE_REQ; - if(obj.isMetadata()) - eventMask |= EV_UNPACKED; // we also need to act on remote updates (unpack events) + connectToObjectInstances(obj, eventMask); + } else if (metadata.GetGcsTelemetryUpdateMode() == UAVObject.UpdateMode.UPDATEMODE_ONCHANGE) { + // Set update period + setUpdatePeriod(obj, 0); + // Connect signals for all instances + eventMask = EV_UPDATED | EV_UPDATED_MANUAL | EV_UPDATE_REQ; + if (obj.isMetadata()) + eventMask |= EV_UNPACKED; // we also need to act on remote + // updates (unpack events) - connectToObjectInstances(obj, eventMask); - } - else if ( metadata.GetGcsTelemetryUpdateMode() == UAVObject.UpdateMode.UPDATEMODE_THROTTLED ) - { - // TODO - } - else if ( metadata.GetGcsTelemetryUpdateMode() == UAVObject.UpdateMode.UPDATEMODE_MANUAL ) - { - // Set update period - setUpdatePeriod(obj, 0); - // Connect signals for all instances - eventMask = EV_UPDATED_MANUAL | EV_UPDATE_REQ; - if(obj.isMetadata()) - eventMask |= EV_UNPACKED; // we also need to act on remote updates (unpack events) + connectToObjectInstances(obj, eventMask); + } else if (metadata.GetGcsTelemetryUpdateMode() == UAVObject.UpdateMode.UPDATEMODE_THROTTLED) { + // TODO + } else if (metadata.GetGcsTelemetryUpdateMode() == UAVObject.UpdateMode.UPDATEMODE_MANUAL) { + // Set update period + setUpdatePeriod(obj, 0); + // Connect signals for all instances + eventMask = EV_UPDATED_MANUAL | EV_UPDATE_REQ; + if (obj.isMetadata()) + eventMask |= EV_UNPACKED; // we also need to act on remote + // updates (unpack events) - connectToObjectInstances(obj, eventMask); - } - } + connectToObjectInstances(obj, eventMask); + } + } - /** - * Called when a transaction is successfully completed (uavtalk event) - * @throws IOException - */ - private synchronized void transactionCompleted(UAVObject obj, boolean result) throws IOException - { - if (DEBUG) Log.d(TAG,"UAVTalk transactionCompleted"); - // Check if there is a pending transaction and the objects match - if ( transPending && transInfo.obj.getObjID() == obj.getObjID() ) - { - if (DEBUG) Log.d(TAG,"Telemetry: transaction completed for " + obj.getName()); - // Complete transaction - transTimer.cancel(); - transPending = false; + /** + * Check is any objects are pending for periodic updates TODO: Clean-up + * + * @throws IOException + */ + private void processPeriodicUpdates() throws IOException { - //Send signal - obj.transactionCompleted(result); - // Process new object updates from queue - processObjectQueue(); - } else - { - if (ERROR) Log.e(TAG,"Error: received a transaction completed when did not expect it."); - transPending = false; - } - } + if (DEBUG) + Log.d(TAG, "processPeriodicUpdates()"); + // Stop timer - /** - * Called when a transaction is not completed within the timeout period (timer event) - * @throws IOException - */ - private synchronized void transactionTimeout() throws IOException - { - if (DEBUG) Log.d(TAG,"Telemetry: transaction timeout."); - transTimer.cancel(); - // Proceed only if there is a pending transaction - if ( transPending ) - { - // Check if more retries are pending - if (transInfo.retriesRemaining > 0) - { - --transInfo.retriesRemaining; - processObjectTransaction(); - ++txRetries; - } - else - { - if (ERROR) Log.e(TAG, "Transaction failed for: " + transInfo.obj.getName()); + updateTimer.cancel(); - // Terminate transaction. This triggers UAVTalk to send a transaction - // failed signal which will make the next queue entry be processed - // Note this is UAVTalk listener TransactionFailed function and not the - // object specific transaction failed. - utalk.cancelPendingTransaction(transInfo.obj); - ++txErrors; - } - } - } + // Iterate through each object and update its timer, if zero then + // transmit object. + // Also calculate smallest delay to next update (will be used for + // setting timeToNextUpdateMs) + int minDelay = MAX_UPDATE_PERIOD_MS; + ObjectTimeInfo objinfo; + int elapsedMs = 0; + long startTime; + int offset; + ListIterator li = objList.listIterator(); + while (li.hasNext()) { + objinfo = li.next(); + // If object is configured for periodic updates + if (objinfo.updatePeriodMs > 0) { + objinfo.timeToNextUpdateMs -= timeToNextUpdateMs; + // Check if time for the next update + if (objinfo.timeToNextUpdateMs <= 0) { + // Reset timer + offset = (-objinfo.timeToNextUpdateMs) + % objinfo.updatePeriodMs; + objinfo.timeToNextUpdateMs = objinfo.updatePeriodMs + - offset; + // Send object + startTime = System.currentTimeMillis(); - /** - * Start an object transaction with UAVTalk, all information is stored in transInfo - * @throws IOException - */ - private synchronized void processObjectTransaction() throws IOException - { - if (transPending) - { - if (DEBUG) Log.d(TAG, "Process Object transaction for " + transInfo.obj.getName()); - // Initiate transaction - if (transInfo.objRequest) - { - utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances); - } - else - { - utalk.sendObject(transInfo.obj, transInfo.acked, transInfo.allInstances); - } - // Start timer if a response is expected - if ( transInfo.objRequest || transInfo.acked ) - { - transTimerSetPeriod(REQ_TIMEOUT_MS); - } - else - { - transTimer.cancel(); - transPending = false; - } - } else - { - if (ERROR) Log.e(TAG,"Error: inside of processObjectTransaction with no transPending"); - } - } + if (DEBUG) Log.d(TAG, "Manual update: " + objinfo.obj.getName()); + handler.updatedManual(objinfo.obj); + // enqueueObjectUpdates(objinfo.obj, EV_UPDATED_MANUAL, + // true, false); + elapsedMs = (int) (System.currentTimeMillis() - startTime); + // Update timeToNextUpdateMs with the elapsed delay of + // sending the object; + timeToNextUpdateMs += elapsedMs; + } + // Update minimum delay + if (objinfo.timeToNextUpdateMs < minDelay) { + minDelay = objinfo.timeToNextUpdateMs; + } + } + } - /** - * Enqueue the event received from an object. This is the main method that handles all the callbacks - * from UAVObjects (due to updates, or update requests) - */ - private void enqueueObjectUpdates(UAVObject obj, int event, boolean allInstances, boolean priority) throws IOException - { - // Push event into queue - if (DEBUG) Log.d(TAG, "Push event into queue for obj " + obj.getName() + " event " + event); - if(event == 8 && obj.getName().compareTo("GCSTelemetryStats") == 0) - Thread.dumpStack(); - ObjectQueueInfo objInfo = new ObjectQueueInfo(); - objInfo.obj = obj; - objInfo.event = event; - objInfo.allInstances = allInstances; - if (priority) - { - // Only enqueue if an identical transaction does not already exist - if(!objPriorityQueue.contains(objInfo)) { - if ( objPriorityQueue.size() < MAX_QUEUE_SIZE ) - { - objPriorityQueue.add(objInfo); - } - else - { - ++txErrors; - obj.transactionCompleted(false); - Log.w(TAG,"Telemetry: priority event queue is full, event lost " + obj.getName()); - } - } - } - else - { - // Only enqueue if an identical transaction does not already exist - if(!objQueue.contains(objInfo)) { - if ( objQueue.size() < MAX_QUEUE_SIZE ) - { - objQueue.add(objInfo); - } - else - { - ++txErrors; - obj.transactionCompleted(false); - } - } - } + // Check if delay for the next update is too short + if (minDelay < MIN_UPDATE_PERIOD_MS) { + minDelay = MIN_UPDATE_PERIOD_MS; + } - // If there is no transaction in progress then process event - if (!transPending) - { - processObjectQueue(); - } - } + // Done + timeToNextUpdateMs = minDelay; - /** - * Process events from the object queue - * @throws IOException - */ - private void processObjectQueue() throws IOException - { - if (DEBUG) Log.d(TAG, "Process object queue - Depth " + objQueue.size() + " priority " + objPriorityQueue.size()); + // Restart timer + updateTimerSetPeriod(timeToNextUpdateMs); + } - // Don nothing if a transaction is already in progress (should not happen) - if (transPending) - { - if (WARN) Log.e(TAG,"Dequeue while a transaction pending"); - return; - } + public TelemetryStats getStats() { + // Get UAVTalk stats + UAVTalk.ComStats utalkStats = utalk.getStats(); - // Get object information from queue (first the priority and then the regular queue) - ObjectQueueInfo objInfo; - synchronized (objPriorityQueue) { - if ( !objPriorityQueue.isEmpty() ) - { - objInfo = objPriorityQueue.remove(); - } else { - synchronized (objQueue) { - if ( !objQueue.isEmpty() ) - { - objInfo = objQueue.remove(); - } - else - { - return; - } - } - } - } + // Update stats + TelemetryStats stats = new TelemetryStats(); + stats.txBytes = utalkStats.txBytes; + stats.rxBytes = utalkStats.rxBytes; + stats.txObjectBytes = utalkStats.txObjectBytes; + stats.rxObjectBytes = utalkStats.rxObjectBytes; + stats.rxObjects = utalkStats.rxObjects; + stats.txObjects = utalkStats.txObjects; + stats.txErrors = utalkStats.txErrors + txErrors; + stats.rxErrors = utalkStats.rxErrors; + stats.txRetries = txRetries; - // Check if a connection has been established, only process GCSTelemetryStats updates - // (used to establish the connection) - gcsStatsObj = objMngr.getObject("GCSTelemetryStats"); - if ( ((String) gcsStatsObj.getField("Status").getValue()).compareTo("Connected") != 0 ) - { - objQueue.clear(); - if ( objInfo.obj.getObjID() != objMngr.getObject("GCSTelemetryStats").getObjID() ) - { - if (DEBUG) Log.d(TAG,"transactionCompleted(false) due to receiving object not GCSTelemetryStats while not connected."); - objInfo.obj.transactionCompleted(false); - return; - } - } + // Done + return stats; + } - // Setup transaction (skip if unpack event) - if ( objInfo.event != EV_UNPACKED ) - { - UAVObject.Metadata metadata = objInfo.obj.getMetadata(); - transInfo.obj = objInfo.obj; - transInfo.allInstances = objInfo.allInstances; - transInfo.retriesRemaining = MAX_RETRIES; - transInfo.acked = metadata.GetGcsTelemetryAcked(); - if ( objInfo.event == EV_UPDATED || objInfo.event == EV_UPDATED_MANUAL ) - { - transInfo.objRequest = false; - } - else if ( objInfo.event == EV_UPDATE_REQ ) - { - transInfo.objRequest = true; - } - // Start transaction - transPending = true; - processObjectTransaction(); - } else - { -// qDebug() << QString("Process object queue: this is an unpack event for %1").arg(objInfo.obj->getName()); - } + public void resetStats() { + utalk.resetStats(); + txErrors = 0; + txRetries = 0; + } - // If this is a metaobject then make necessary telemetry updates - if (objInfo.obj.isMetadata()) - { - UAVMetaObject metaobj = (UAVMetaObject) objInfo.obj; - updateObject( metaobj.getParentObject() ); - } + private void newObject(UAVObject obj) { + registerObject(obj); + } - // The fact we received an unpacked event does not mean that - // we do not have additional objects still in the queue, - // so we have to reschedule queue processing to make sure they are not - // stuck: - if ( objInfo.event == EV_UNPACKED && !transPending) - processObjectQueue(); + private synchronized void newInstance(UAVObject obj) { + registerObject(obj); + } - } - - /** - * Check is any objects are pending for periodic updates - * TODO: Clean-up - * @throws IOException - */ - private synchronized void processPeriodicUpdates() throws IOException - { - - if (DEBUG) Log.d(TAG, "processPeriodicUpdates()"); - // Stop timer - updateTimer.cancel(); - - // Iterate through each object and update its timer, if zero then transmit object. - // Also calculate smallest delay to next update (will be used for setting timeToNextUpdateMs) - int minDelay = MAX_UPDATE_PERIOD_MS; - ObjectTimeInfo objinfo; - int elapsedMs = 0; - long startTime; - int offset; - ListIterator li = objList.listIterator(); - while(li.hasNext()) - { - objinfo = li.next(); - // If object is configured for periodic updates - if (objinfo.updatePeriodMs > 0) - { - objinfo.timeToNextUpdateMs -= timeToNextUpdateMs; - // Check if time for the next update - if (objinfo.timeToNextUpdateMs <= 0) - { - // Reset timer - offset = (-objinfo.timeToNextUpdateMs) % objinfo.updatePeriodMs; - objinfo.timeToNextUpdateMs = objinfo.updatePeriodMs - offset; - // Send object - startTime = System.currentTimeMillis(); - enqueueObjectUpdates(objinfo.obj, EV_UPDATED_MANUAL, true, false); - elapsedMs = (int) (System.currentTimeMillis() - startTime); - // Update timeToNextUpdateMs with the elapsed delay of sending the object; - timeToNextUpdateMs += elapsedMs; - } - // Update minimum delay - if (objinfo.timeToNextUpdateMs < minDelay) - { - minDelay = objinfo.timeToNextUpdateMs; - } - } - } - - // Check if delay for the next update is too short - if (minDelay < MIN_UPDATE_PERIOD_MS) - { - minDelay = MIN_UPDATE_PERIOD_MS; - } - - // Done - timeToNextUpdateMs = minDelay; - - // Restart timer - updateTimerSetPeriod(timeToNextUpdateMs); - } - - public TelemetryStats getStats() - { - // Get UAVTalk stats - UAVTalk.ComStats utalkStats = utalk.getStats(); - - // Update stats - TelemetryStats stats = new TelemetryStats(); - stats.txBytes = utalkStats.txBytes; - stats.rxBytes = utalkStats.rxBytes; - stats.txObjectBytes = utalkStats.txObjectBytes; - stats.rxObjectBytes = utalkStats.rxObjectBytes; - stats.rxObjects = utalkStats.rxObjects; - stats.txObjects = utalkStats.txObjects; - stats.txErrors = utalkStats.txErrors + txErrors; - stats.rxErrors = utalkStats.rxErrors; - stats.txRetries = txRetries; - - // Done - return stats; - } - - public synchronized void resetStats() - { - utalk.resetStats(); - txErrors = 0; - txRetries = 0; - } - - - private void newObject(UAVObject obj) - { - registerObject(obj); - } - - private synchronized void newInstance(UAVObject obj) - { - registerObject(obj); - } - - /** - * Stop all the telemetry timers - */ - public void stopTelemetry() - { - if (updateTimerTask != null) - updateTimerTask.cancel(); - updateTimerTask = null; - if (updateTimer != null) - updateTimer.cancel(); - updateTimer = null; - if (transTimerTask != null) - transTimerTask.cancel(); - transTimerTask = null; - if (transTimer != null) - transTimer.cancel(); - transTimer = null; - } + /** + * Stop all the telemetry timers + */ + public void stopTelemetry() { + if (updateTimerTask != null) + updateTimerTask.cancel(); + updateTimerTask = null; + if (updateTimer != null) + updateTimer.cancel(); + updateTimer = null; + } /** * Private variables */ - private final UAVObjectManager objMngr; - private final UAVTalk utalk; - private UAVObject gcsStatsObj; - private final List objList = new ArrayList(); - private final Queue objQueue = new ConcurrentLinkedQueue(); - private final Queue objPriorityQueue = new ConcurrentLinkedQueue(); - private final ObjectTransactionInfo transInfo = new ObjectTransactionInfo(); - private boolean transPending; + private final UAVObjectManager objMngr; + private final UAVTalk utalk; + private UAVObject gcsStatsObj; + private final List objList = new ArrayList(); + private ObjectTransactionInfo transInfo = new ObjectTransactionInfo(); + private boolean transPending; - private Timer updateTimer; - private TimerTask updateTimerTask; - private Timer transTimer; - private TimerTask transTimerTask; + private Timer updateTimer; + private TimerTask updateTimerTask; - private int timeToNextUpdateMs; - private int txErrors; - private int txRetries; + private int timeToNextUpdateMs; + private int txErrors; + private int txRetries; - /** - * Private constants - */ - private static final int REQ_TIMEOUT_MS = 250; - private static final int MAX_RETRIES = 2; - private static final int MAX_UPDATE_PERIOD_MS = 1000; - private static final int MIN_UPDATE_PERIOD_MS = 1; - private static final int MAX_QUEUE_SIZE = 20; + /** + * Private constants + */ + private static final int REQ_TIMEOUT_MS = 250; + private static final int MAX_RETRIES = 2; + private static final int MAX_UPDATE_PERIOD_MS = 1000; + private static final int MIN_UPDATE_PERIOD_MS = 1; + + static private ObjectUpdateHandler handler; + + //! Accessor for the object updated handler + ObjectUpdateHandler getHandler() { return handler; } + + /** + * Handler which posts all the messages for individual object updates + */ + public class ObjectUpdateHandler extends Handler { + + // ! This can only be created while attaching to a particular looper + ObjectUpdateHandler(Looper l) { + super(l); + } + + Queue objQueue = new ConcurrentLinkedQueue(); + + // ! Generic enqueue + void enqueueObjectUpdates(UAVObject obj, int event, + boolean allInstances, boolean priority) { + + if (DEBUG) Log.d(TAG, "Enqueing update " + obj.getName() + " event " + event); + + ObjectQueueInfo objInfo = new ObjectQueueInfo(); + objInfo.obj = obj; + objInfo.event = event; + objInfo.allInstances = allInstances; + + // For now maintain a list of objects in the queue so we don't add duplicates + // later we should make the runnables static to each class so we can use removeCallback + synchronized(objQueue) { + if (objQueue.contains(objInfo)) { + if (WARN) Log.w(TAG, "Found previously scheduled queue element: " + objInfo.obj.getName()); + } else { + objQueue.add(objInfo); + post(new ObjectRunnable(objInfo)); + } + } + } + + public boolean removeActivatedQueue(ObjectQueueInfo objInfo) { + synchronized(objQueue) { + if (objQueue.remove(objInfo)) { + if (WARN) Log.w(TAG, "Unable to find queue element to remove"); + return false; + } + } + return true; + } + + // ! Enqueue an unpacked event + void unpacked(UAVObject obj) { + enqueueObjectUpdates(obj, EV_UNPACKED, false, true); + } + + // ! Enqueue an updated auto event + void updatedAuto(UAVObject obj) { + enqueueObjectUpdates(obj, EV_UPDATED, false, true); + } + + // ! Enqueue an updated manual event + void updatedManual(UAVObject obj) { + enqueueObjectUpdates(obj, EV_UPDATED_MANUAL, false, true); + } + + // ! Enqueue an update requested event + void updateRequested(UAVObject obj) { + enqueueObjectUpdates(obj, EV_UPDATE_REQ, false, true); + } + + } + + /** + * Perform an update on an object where on an event based on the contents provided + * to the constructors. This update will also set a timeout for transaction failure. + */ + class ObjectRunnable implements Runnable { + + // ! Transaction information to perform + private final ObjectQueueInfo objInfo; + + ObjectRunnable(ObjectQueueInfo info) { + Assert.assertNotNull(info); + objInfo = info; + } + + // ! Perform the transaction on the looper thread + @Override + public void run() { + if (DEBUG) Log.d(TAG, "Object transaction running. Event:" + objInfo.event); + // 1. Check GCS is connected, throw this out if not + // 2. Set up a transaction which includes multiple retries, whether + // to wait for ack etc + // 3. Send UAVTalk message + // 4. Based on transaction type either wait for update or end + + // 1. Check if a connection has been established, only process + // GCSTelemetryStats updates + // (used to establish the connection) + gcsStatsObj = objMngr.getObject("GCSTelemetryStats"); + if (((String) gcsStatsObj.getField("Status").getValue()).compareTo("Connected") != 0) { + if (objInfo.obj.getObjID() != objMngr.getObject("GCSTelemetryStats").getObjID()) { + if (DEBUG) + Log.d(TAG, "transactionCompleted(false) due to receiving object not GCSTelemetryStats while not connected."); + objInfo.obj.transactionCompleted(false); + return; + } + } + + // If this is a metaobject then make necessary telemetry updates + // (this is why we catch unpack) + if (objInfo.obj.isMetadata()) { + UAVMetaObject metaobj = (UAVMetaObject) objInfo.obj; + updateObject(metaobj.getParentObject()); + } + + // 2. Setup transaction (skip if unpack event) + ObjectTransactionInfo newTrans = new ObjectTransactionInfo(); + boolean newTransactionPending = false; + if (objInfo.event != EV_UNPACKED) { + UAVObject.Metadata metadata = objInfo.obj.getMetadata(); + newTrans.obj = objInfo.obj; + newTrans.allInstances = objInfo.allInstances; + newTrans.retriesRemaining = MAX_RETRIES; + newTrans.acked = metadata.GetGcsTelemetryAcked(); + if (objInfo.event == EV_UPDATED || objInfo.event == EV_UPDATED_MANUAL) { + newTrans.objRequest = false; + } else if (objInfo.event == EV_UPDATE_REQ) { + newTrans.objRequest = true; + } + + // Determine if this will schedule a new transaction + newTransactionPending = (newTrans.objRequest || newTrans.acked); + + synchronized (transInfo) { + + // If there is a transaction pending and this would set up a new one reschedule it + if (transPending && newTransactionPending) { + if (WARN) Log.w(TAG, "Postponing transaction for" + newTrans.obj.getName() + " existing transaction for " + transInfo.obj.getName()); + handler.postDelayed(this, 100); + return; + } + + // Store this as the active transaction + transPending = newTransactionPending; + transInfo = newTrans; + + if (DEBUG) Log.d(TAG, "Process Object transaction for " + transInfo.obj.getName()); + + // Remove this one from the list of pending transactions + handler.removeActivatedQueue(objInfo); + + try { + + // 3. Execute transaction by sending the appropriate UAVTalk command + if (transInfo.objRequest) { + if (DEBUG) Log.d(TAG, "Sending object request " + transInfo.obj.getName()); + utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances); + } else { + if (DEBUG) Log.d(TAG, "Sending object " + transInfo.obj.getName()); + utalk.sendObject(transInfo.obj, transInfo.acked, transInfo.allInstances); + } + + } catch (IOException e) { + if (ERROR) Log.e(TAG, "Unable to send UAVTalk message"); + e.printStackTrace(); + } + + // Post a timeout timer if a response is epxected + if (transPending) + handler.postDelayed(transactionTimeout, REQ_TIMEOUT_MS); + } + } + } + } + /** + * Runnable posted to handle a timeout of a transaction. Tracks the number of retry attempts + * retries that many, and finally sends a transaction failed signal. + */ + final Runnable transactionTimeout = new Runnable() { + @Override + public void run() { + // Lock on the transaction + synchronized (transInfo) { + + // Proceed only if there is a pending transaction + if (!transPending) { + if (WARN) Log.w(TAG,"Transaction completed but timeout still called. Probable race condition"); + return; + } + + if (DEBUG) Log.d(TAG, "Telemetry: transaction timeout."); + + // Check if more retries are pending + if (transInfo.retriesRemaining > 0) { + --transInfo.retriesRemaining; + + // Repeat whatever is required for this transaction type + // (transInfo.objRequest) { + if (DEBUG) Log.d(TAG, "Sending object request"); + + try { + // Execute transaction by sending the appropriate UAVTalk command + if (transInfo.objRequest) { + if (DEBUG) Log.d(TAG, "Sending object request" + transInfo.obj.getName()); + utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances); + } else { + if (DEBUG) Log.d(TAG, "Sending object " + transInfo.obj.getName()); + utalk.sendObject(transInfo.obj, transInfo.acked, transInfo.allInstances); + } + } catch (IOException e) { + if (ERROR) Log.e(TAG, "Unable to send UAVTalk message"); + e.printStackTrace(); + } + + handler.postDelayed(transactionTimeout, REQ_TIMEOUT_MS); + + ++txRetries; + } else { + if (ERROR) Log.e(TAG, "Transaction failed for: " + transInfo.obj.getName()); + + // Terminate transaction. This triggers UAVTalk to send a transaction + // failed signal which will make the next queue entry be processed + // Note this is UAVTalk listener TransactionFailed function + // object specific transaction failed. + utalk.cancelPendingTransaction(transInfo.obj); + ++txErrors; + } + } + } + }; + + + /** + * Called when a transaction is successfully completed (UAVTalk event) and maps that to + * the appropriate object event as well as canceling the pending transaction and timeout + */ + private void transactionCompleted(UAVObject obj, boolean result) { + + if (DEBUG) Log.d(TAG, "UAVTalk transactionCompleted"); + + // Check if there is a pending transaction and the objects match + synchronized(transInfo) { + if (transPending && transInfo.obj.getObjID() == obj.getObjID()) { + if (DEBUG) Log.d(TAG, "Telemetry: transaction completed for " + obj.getName()); + + // Cancel timeout and complete transaction + handler.removeCallbacks(transactionTimeout); + transPending = false; + + //Send signal + obj.transactionCompleted(result); + } else { + if (ERROR) Log.e(TAG, "Error: received a transaction completed when did not expect it."); + transPending = false; + } + } + } } + diff --git a/androidgcs/src/org/openpilot/uavtalk/TelemetryMonitor.java b/androidgcs/src/org/openpilot/uavtalk/TelemetryMonitor.java index a1e341197..121a5ebcc 100644 --- a/androidgcs/src/org/openpilot/uavtalk/TelemetryMonitor.java +++ b/androidgcs/src/org/openpilot/uavtalk/TelemetryMonitor.java @@ -34,6 +34,8 @@ import java.util.Observer; import java.util.Timer; import java.util.TimerTask; +import org.openpilot.androidgcs.telemetry.OPTelemetryService; + import android.util.Log; public class TelemetryMonitor extends Observable { @@ -60,6 +62,7 @@ public class TelemetryMonitor extends Observable { private long lastUpdateTime; private final List queue; + private OPTelemetryService telemService; private boolean connected = false; private boolean objects_updated = false; @@ -71,6 +74,11 @@ public class TelemetryMonitor extends Observable { return objects_updated; }; + public TelemetryMonitor(UAVObjectManager objMngr, Telemetry tel, OPTelemetryService s) { + this(objMngr, tel); + telemService = s; + } + public TelemetryMonitor(UAVObjectManager objMngr, Telemetry tel) { this.objMngr = objMngr; this.tel = tel; @@ -171,7 +179,9 @@ public class TelemetryMonitor extends Observable { public synchronized void retrieveNextObject() throws IOException { // If queue is empty return if (queue.isEmpty()) { - if (DEBUG || true) Log.d(TAG, "All objects retrieved: Connected Successfully"); + if (telemService != null) + telemService.toastMessage("Connected"); + if (DEBUG) Log.d(TAG, "All objects retrieved: Connected Successfully"); objects_updated = true; if (!HANDSHAKE_IS_CONNECTED) { setChanged(); diff --git a/androidgcs/tests/org/openpilot/uavtalk/TelemetryMonitorTest.java b/androidgcs/tests/org/openpilot/uavtalk/TelemetryMonitorTest.java index 0b0cc1d73..6ddcfe03e 100644 --- a/androidgcs/tests/org/openpilot/uavtalk/TelemetryMonitorTest.java +++ b/androidgcs/tests/org/openpilot/uavtalk/TelemetryMonitorTest.java @@ -1,6 +1,6 @@ package org.openpilot.uavtalk; -import static org.junit.Assert.*; +import static org.junit.Assert.fail; import java.io.IOException; import java.net.InetAddress; @@ -8,11 +8,12 @@ import java.net.Socket; import org.junit.Test; import org.openpilot.uavtalk.uavobjects.UAVObjectsInitialize; -import org.openpilot.uavtalk.UAVTalk; + +import android.os.Looper; public class TelemetryMonitorTest { - + static UAVObjectManager objMngr; static UAVTalk talk; static final String IP_ADDRDESS = new String("127.0.0.1"); @@ -32,7 +33,7 @@ public class TelemetryMonitorTest { e.printStackTrace(); fail("Couldn't connect to test platform"); } - + try { talk = new UAVTalk(connection.getInputStream(), connection.getOutputStream(), objMngr); } catch (IOException e) { @@ -40,16 +41,17 @@ public class TelemetryMonitorTest { e.printStackTrace(); fail("Couldn't construct UAVTalk object"); } - + Thread inputStream = talk.getInputProcessThread(); inputStream.start(); - - Telemetry tel = new Telemetry(talk, objMngr); + + Looper.prepare(); + Telemetry tel = new Telemetry(talk, objMngr, Looper.myLooper()); @SuppressWarnings("unused") TelemetryMonitor mon = new TelemetryMonitor(objMngr,tel); - + Thread.sleep(10000); - + System.out.println("Done"); }