1
0
mirror of https://bitbucket.org/librepilot/librepilot.git synced 2024-11-30 08:24:11 +01:00

Merge branch 'android_telemetry_handler' into revo

Conflicts:
	androidgcs/src/org/openpilot/uavtalk/Telemetry.java
This commit is contained in:
James Cotton 2012-08-16 03:59:37 -05:00
commit 31267d9b8c
6 changed files with 837 additions and 828 deletions

View File

@ -28,9 +28,10 @@ import android.util.Log;
public class HidUAVTalk extends TelemetryTask { public class HidUAVTalk extends TelemetryTask {
private static final String TAG = HidUAVTalk.class.getSimpleName(); private static final String TAG = HidUAVTalk.class.getSimpleName();
public static int LOGLEVEL = 0; public static final int LOGLEVEL = 1;
public static boolean WARN = LOGLEVEL > 1; public static final boolean DEBUG = LOGLEVEL > 2;
public static boolean DEBUG = LOGLEVEL > 0; public static final boolean WARN = LOGLEVEL > 1;
public static final boolean ERROR = LOGLEVEL > 0;
//! USB constants //! USB constants
private static final int MAX_HID_PACKET_SIZE = 64; private static final int MAX_HID_PACKET_SIZE = 64;
@ -46,7 +47,24 @@ public class HidUAVTalk extends TelemetryTask {
private static final String ACTION_USB_PERMISSION = "com.access.device.USB_PERMISSION"; private static final String ACTION_USB_PERMISSION = "com.access.device.USB_PERMISSION";
UsbDevice currentDevice; private UsbDevice currentDevice;
private UsbEndpoint usbEndpointRead;
private UsbEndpoint usbEndpointWrite;
private UsbManager usbManager;
private PendingIntent permissionIntent;
private UsbDeviceConnection usbDeviceConnection;
private IntentFilter permissionFilter;
private UsbInterface usbInterface = null;
private TalkInputStream inTalkStream;
private TalkOutputStream outTalkStream;
private final UsbRequest writeRequest = null;
private UsbRequest readRequest = null;
private Thread readThread;
private Thread writeThread;
private boolean readPending = false;
private boolean writePending = false;
private IntentFilter deviceAttachedFilter;
public HidUAVTalk(OPTelemetryService service) { public HidUAVTalk(OPTelemetryService service) {
super(service); super(service);
@ -56,15 +74,21 @@ public class HidUAVTalk extends TelemetryTask {
public void disconnect() { public void disconnect() {
CleanUpAndClose(); CleanUpAndClose();
//hostDisplayActivity.unregisterReceiver(usbReceiver); telemService.unregisterReceiver(usbReceiver);
telemService.unregisterReceiver(usbPermissionReceiver); telemService.unregisterReceiver(usbPermissionReceiver);
super.disconnect(); super.disconnect();
try { try {
if(readWriteThread != null) { if(readThread != null) {
readWriteThread.join(); readThread.interrupt(); // Make sure not blocking for data
readWriteThread = null; readThread.join();
readThread = null;
}
if(writeThread != null) {
writeThread.interrupt();
writeThread.join();
writeThread = null;
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
@ -75,13 +99,6 @@ public class HidUAVTalk extends TelemetryTask {
readRequest.close(); readRequest.close();
readRequest = null; readRequest = null;
} }
if (writeRequest != null) {
writeRequest.cancel();
writeRequest.close();
writeRequest = null;
}
} }
@Override @Override
@ -94,6 +111,11 @@ public class HidUAVTalk extends TelemetryTask {
permissionFilter = new IntentFilter(ACTION_USB_PERMISSION); permissionFilter = new IntentFilter(ACTION_USB_PERMISSION);
telemService.registerReceiver(usbPermissionReceiver, permissionFilter); telemService.registerReceiver(usbPermissionReceiver, permissionFilter);
deviceAttachedFilter = new IntentFilter();
deviceAttachedFilter.addAction(UsbManager.ACTION_USB_DEVICE_ATTACHED);
deviceAttachedFilter.addAction(UsbManager.ACTION_USB_DEVICE_DETACHED);
telemService.registerReceiver(usbReceiver, deviceAttachedFilter);
// Go through all the devices plugged in // Go through all the devices plugged in
HashMap<String, UsbDevice> deviceList = usbManager.getDeviceList(); HashMap<String, UsbDevice> deviceList = usbManager.getDeviceList();
if (DEBUG) Log.d(TAG, "Found " + deviceList.size() + " devices"); if (DEBUG) Log.d(TAG, "Found " + deviceList.size() + " devices");
@ -160,7 +182,6 @@ public class HidUAVTalk extends TelemetryTask {
} }
}; };
/* TODO: Detect dettached events and close the connection
private final BroadcastReceiver usbReceiver = new BroadcastReceiver() private final BroadcastReceiver usbReceiver = new BroadcastReceiver()
{ {
@Override @Override
@ -179,8 +200,10 @@ public class HidUAVTalk extends TelemetryTask {
{ {
if (device.equals(currentDevice)) if (device.equals(currentDevice))
{ {
telemService.toastMessage("Device unplugged while in use");
if (DEBUG) Log.d(TAG, "Matching device disconnected");
// call your method that cleans up and closes communication with the device // call your method that cleans up and closes communication with the device
CleanUpAndClose(); disconnect();
} }
} }
} }
@ -197,36 +220,13 @@ public class HidUAVTalk extends TelemetryTask {
} }
} }
} }
}; */ };
private UsbEndpoint usbEndpointRead;
private UsbEndpoint usbEndpointWrite;
private UsbManager usbManager;
private PendingIntent permissionIntent;
private UsbDeviceConnection connectionRead;
private UsbDeviceConnection connectionWrite;
private IntentFilter permissionFilter;
protected void CleanUpAndClose() { protected void CleanUpAndClose() {
if (UsingSingleInterface) { if(usbDeviceConnection != null && usbInterface != null)
if(connectionRead != null && usbInterfaceRead != null) usbDeviceConnection.releaseInterface(usbInterface);
connectionRead.releaseInterface(usbInterfaceRead); usbInterface = null;
usbInterfaceRead = null;
}
else {
if(connectionRead != null && usbInterfaceRead != null)
connectionRead.releaseInterface(usbInterfaceRead);
if(connectionWrite != null && usbInterfaceWrite != null)
connectionWrite.releaseInterface(usbInterfaceWrite);
usbInterfaceWrite = null;
usbInterfaceRead = null;
}
} }
//Validating the Connected Device - Before asking for permission to connect to the device, it is essential that you ensure that this is a device that you support or expect to connect to. This can be done by validating the devices Vendor ID and Product ID. //Validating the Connected Device - Before asking for permission to connect to the device, it is essential that you ensure that this is a device that you support or expect to connect to. This can be done by validating the devices Vendor ID and Product ID.
@ -244,14 +244,6 @@ public class HidUAVTalk extends TelemetryTask {
return false; return false;
} }
private UsbInterface usbInterfaceRead = null;
private UsbInterface usbInterfaceWrite = null;
private final boolean UsingSingleInterface = true;
private TalkInputStream inTalkStream;
private TalkOutputStream outTalkStream;
UsbRequest writeRequest = null;
boolean ConnectToDeviceInterface(UsbDevice connectDevice) { boolean ConnectToDeviceInterface(UsbDevice connectDevice) {
// Connecting to the Device - If you are reading and writing, then the device // Connecting to the Device - If you are reading and writing, then the device
// can either have two end points on a single interface, or two interfaces // can either have two end points on a single interface, or two interfaces
@ -262,87 +254,51 @@ public class HidUAVTalk extends TelemetryTask {
UsbEndpoint ep1 = null; UsbEndpoint ep1 = null;
UsbEndpoint ep2 = null; UsbEndpoint ep2 = null;
// Using the same interface for reading and writing
if (UsingSingleInterface) usbInterface = connectDevice.getInterface(0x2);
if (usbInterface.getEndpointCount() == 2)
{ {
// Using the same interface for reading and writing ep1 = usbInterface.getEndpoint(0);
usbInterfaceRead = connectDevice.getInterface(0x2); ep2 = usbInterface.getEndpoint(1);
usbInterfaceWrite = usbInterfaceRead;
if (usbInterfaceRead.getEndpointCount() == 2)
{
ep1 = usbInterfaceRead.getEndpoint(0);
ep2 = usbInterfaceRead.getEndpoint(1);
}
} }
else // if (!UsingSingleInterface)
{
usbInterfaceRead = connectDevice.getInterface(0x01);
usbInterfaceWrite = connectDevice.getInterface(0x02);
if ((usbInterfaceRead.getEndpointCount() == 1) && (usbInterfaceWrite.getEndpointCount() == 1))
{
ep1 = usbInterfaceRead.getEndpoint(0);
ep2 = usbInterfaceWrite.getEndpoint(0);
}
}
if ((ep1 == null) || (ep2 == null)) if ((ep1 == null) || (ep2 == null))
{ {
if (DEBUG) Log.d(TAG, "Null endpoints"); if (ERROR) Log.e(TAG, "Null endpoints");
return false; return false;
} }
// Determine which endpoint is the read, and which is the write // Determine which endpoint is the read, and which is the write
if (ep1.getType() == UsbConstants.USB_ENDPOINT_XFER_INT) if (ep1.getType() == UsbConstants.USB_ENDPOINT_XFER_INT)
{ {
if (ep1.getDirection() == UsbConstants.USB_DIR_IN) if (ep1.getDirection() == UsbConstants.USB_DIR_IN)
{
usbEndpointRead = ep1; usbEndpointRead = ep1;
}
else if (ep1.getDirection() == UsbConstants.USB_DIR_OUT) else if (ep1.getDirection() == UsbConstants.USB_DIR_OUT)
{
usbEndpointWrite = ep1; usbEndpointWrite = ep1;
}
} }
if (ep2.getType() == UsbConstants.USB_ENDPOINT_XFER_INT) if (ep2.getType() == UsbConstants.USB_ENDPOINT_XFER_INT)
{ {
if (ep2.getDirection() == UsbConstants.USB_DIR_IN) if (ep2.getDirection() == UsbConstants.USB_DIR_IN)
{
usbEndpointRead = ep2; usbEndpointRead = ep2;
}
else if (ep2.getDirection() == UsbConstants.USB_DIR_OUT) else if (ep2.getDirection() == UsbConstants.USB_DIR_OUT)
{
usbEndpointWrite = ep2; usbEndpointWrite = ep2;
}
} }
if ((usbEndpointRead == null) || (usbEndpointWrite == null)) if ((usbEndpointRead == null) || (usbEndpointWrite == null))
{ {
if (DEBUG) Log.d(TAG, "Endpoints wrong way around"); if (ERROR) Log.e(TAG, "Could not find write and read endpoint");
return false; return false;
} }
connectionRead = usbManager.openDevice(connectDevice);
connectionRead.claimInterface(usbInterfaceRead, true);
// Claim the interface
usbDeviceConnection = usbManager.openDevice(connectDevice);
usbDeviceConnection.claimInterface(usbInterface, true);
if (UsingSingleInterface)
{
connectionWrite = connectionRead;
}
else // if (!UsingSingleInterface)
{
connectionWrite = usbManager.openDevice(connectDevice);
connectionWrite.claimInterface(usbInterfaceWrite, true);
}
if (DEBUG) Log.d(TAG, "Opened endpoints"); if (DEBUG) Log.d(TAG, "Opened endpoints");
// Create the USB requests // Create the USB requests
readRequest = new UsbRequest(); readRequest = new UsbRequest();
readRequest.initialize(connectionRead, usbEndpointRead); readRequest.initialize(usbDeviceConnection, usbEndpointRead);
writeRequest = new UsbRequest();
writeRequest.initialize(connectionWrite, usbEndpointWrite);
inTalkStream = new TalkInputStream(); inTalkStream = new TalkInputStream();
outTalkStream = new TalkOutputStream(); outTalkStream = new TalkOutputStream();
@ -355,22 +311,48 @@ public class HidUAVTalk extends TelemetryTask {
} }
}); });
readWriteThread = new Thread(new Runnable() { readThread = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
// Enqueue the first read
queueRead();
while (!shutdown) { while (!shutdown) {
readData(); UsbRequest returned = usbDeviceConnection.requestWait();
sendData(); if (returned == readRequest) {
if (DEBUG) Log.d(TAG, "Received read request");
readData();
} else {
Log.e(TAG, "Received unknown USB response");
break;
}
} }
} }
}, "HID Read Write"); }, "HID Read");
readWriteThread.start(); readThread.start();
writeThread = new Thread(new Runnable() {
@Override
public void run() {
if (DEBUG) Log.d(TAG, "Starting HID write thread");
while(!shutdown) {
try {
if (sendDataSynchronous() == false)
break;
} catch (InterruptedException e) {
break;
}
}
if (DEBUG) Log.d(TAG, "Ending HID write thread");
}
}, "HID Write");
writeThread.start();
telemService.toastMessage("HID Device Opened");
return true; return true;
} }
Thread readWriteThread;
void displayBuffer(String msg, byte[] buf) { void displayBuffer(String msg, byte[] buf) {
msg += " ("; msg += " (";
@ -384,71 +366,100 @@ public class HidUAVTalk extends TelemetryTask {
* Gets a report from HID, extract the meaningful data and push * Gets a report from HID, extract the meaningful data and push
* it to the input stream * it to the input stream
*/ */
UsbRequest readRequest = null; ByteBuffer readBuffer = ByteBuffer.allocate(MAX_HID_PACKET_SIZE);
public int readData() {
int bufferDataLength = usbEndpointRead.getMaxPacketSize();
ByteBuffer buffer = ByteBuffer.allocate(bufferDataLength + 1);
// queue a request on the interrupt endpoint /**
if(!readRequest.queue(buffer, bufferDataLength)) { * Schedules a USB read
if (DEBUG) Log.d(TAG, "Failed to queue request"); */
return 0; private void queueRead() {
} synchronized(readRequest) {
if(!readRequest.queue(readBuffer, MAX_HID_PACKET_SIZE)) {
if (DEBUG) Log.d(TAG, "Request queued"); if (ERROR) Log.e(TAG, "Failed to queue request");
} else
int dataSize; readPending = true;
// wait for status event }
if (connectionRead.requestWait() == readRequest) {
// Packet format:
// 0: Report ID (1)
// 1: Number of valid bytes
// 2:63: Data
dataSize = buffer.get(1); // Data size
//Assert.assertEquals(1, buffer.get()); // Report ID
//Assert.assertTrue(dataSize < buffer.capacity());
if (buffer.get(0) != 1 || buffer.get(1) < 0 || buffer.get(1) > (buffer.capacity() - 2)) {
if (DEBUG) Log.d(TAG, "Badly formatted HID packet");
} else {
byte[] dst = new byte[dataSize];
buffer.position(2);
buffer.get(dst, 0, dataSize);
if (DEBUG) Log.d(TAG, "Entered read");
inTalkStream.write(dst);
if (DEBUG) Log.d(TAG, "Got read: " + dataSize + " bytes");
}
} else
return 0;
return dataSize;
} }
/**
* Reads data from the last USB transaction and schedules another read
*/
public void readData() {
synchronized(readRequest) {
if (!readPending) {
if (ERROR) Log.e(TAG, "Tried to read read while a transaction was not pending");
return;
}
// We just received a read
readPending = false;
// Packet format:
// 0: Report ID (1)
// 1: Number of valid bytes
// 2:63: Data
int dataSize = readBuffer.get(1); // Data size
//Assert.assertEquals(1, buffer.get()); // Report ID
//Assert.assertTrue(dataSize < buffer.capacity());
if (readBuffer.get(0) != 1 || readBuffer.get(1) < 0 || readBuffer.get(1) > (readBuffer.capacity() - 2)) {
if (ERROR) Log.e(TAG, "Badly formatted HID packet");
} else {
byte[] dst = new byte[dataSize];
readBuffer.position(2);
readBuffer.get(dst, 0, dataSize);
if (DEBUG) Log.d(TAG, "Entered read");
inTalkStream.write(dst);
if (DEBUG) Log.d(TAG, "Got read: " + dataSize + " bytes");
}
// Queue another read
queueRead();
}
}
/** /**
* Send a packet if data is available * Send a packet if data is available
*/ */
public void sendData() { public void sendData() {
ByteBuffer packet = null; synchronized(writeRequest) {
do { // Send all the data available to prevent sending backlog // Don't try and send data till previous request completes
packet = outTalkStream.getHIDpacket(); if (writePending)
return;
ByteBuffer packet = outTalkStream.getHIDpacket();
if (packet != null) { if (packet != null) {
if (DEBUG) Log.d(TAG, "Writing to device()"); if (DEBUG) Log.d(TAG, "Writing to device()");
int bufferDataLength = usbEndpointWrite.getMaxPacketSize(); int bufferDataLength = usbEndpointWrite.getMaxPacketSize();
Assert.assertTrue(packet.capacity() <= bufferDataLength); Assert.assertTrue(packet.capacity() <= bufferDataLength);
writeRequest.queue(packet, bufferDataLength); if(writeRequest.queue(packet, bufferDataLength))
try writePending = true;
{ else if (ERROR)
if (!writeRequest.equals(connectionWrite.requestWait())) Log.e(TAG, "Write queuing failed");
Log.e(TAG, "writeRequest failed");
}
catch (Exception ex)
{
}
} }
} while (packet != null); }
}
/**
* Send a packet if data is available
* @throws InterruptedException
*/
public boolean sendDataSynchronous() throws InterruptedException {
ByteBuffer packet = outTalkStream.getHIDpacketBlocking();
if (packet != null) {
if (DEBUG) Log.d(TAG, "sendDataSynchronous() Writing to device()");
if (usbDeviceConnection.bulkTransfer(usbEndpointWrite, packet.array(), MAX_HID_PACKET_SIZE, 1000) < 0) {
Log.e(TAG, "Failed to perform bulk write");
return false;
}
}
return true;
} }
/*********** Helper classes for telemetry streams ************/ /*********** Helper classes for telemetry streams ************/
@ -458,6 +469,20 @@ public class HidUAVTalk extends TelemetryTask {
// and ByteFifo.put(byte []) // and ByteFifo.put(byte [])
ByteFifo data = new ByteFifo(); ByteFifo data = new ByteFifo();
/**
* Blocks until data is available and then returns a properly formatted HID packet
*/
public ByteBuffer getHIDpacketBlocking() throws InterruptedException {
synchronized(data) {
if (data.remaining() == 0)
data.wait();
return getHIDpacket();
}
}
/**
* Gets data from the ByteFifo in a properly formatted HID packet
*/
public ByteBuffer getHIDpacket() { public ByteBuffer getHIDpacket() {
ByteBuffer packet = null; ByteBuffer packet = null;
synchronized(data) { synchronized(data) {

View File

@ -26,16 +26,10 @@
*/ */
package org.openpilot.androidgcs.telemetry; package org.openpilot.androidgcs.telemetry;
import java.io.IOException;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.util.Observable;
import java.util.Observer;
import org.openpilot.uavtalk.Telemetry;
import org.openpilot.uavtalk.TelemetryMonitor;
import org.openpilot.uavtalk.UAVDataObject; import org.openpilot.uavtalk.UAVDataObject;
import org.openpilot.uavtalk.UAVObjectManager; import org.openpilot.uavtalk.UAVObjectManager;
import org.openpilot.uavtalk.UAVTalk;
import org.openpilot.uavtalk.uavobjects.UAVObjectsInitialize; import org.openpilot.uavtalk.uavobjects.UAVObjectsInitialize;
import android.app.Service; import android.app.Service;
@ -127,7 +121,7 @@ public class OPTelemetryService extends Service {
break; break;
case 2: case 2:
Toast.makeText(getApplicationContext(), "Attempting BT connection", Toast.LENGTH_SHORT).show(); Toast.makeText(getApplicationContext(), "Attempting BT connection", Toast.LENGTH_SHORT).show();
activeTelem = new BTTelemetryThread(); //activeTelem = new BTTelemetryThread();
break; break;
case 3: case 3:
Toast.makeText(getApplicationContext(), "Attempting TCP connection", Toast.LENGTH_SHORT).show(); Toast.makeText(getApplicationContext(), "Attempting TCP connection", Toast.LENGTH_SHORT).show();
@ -347,6 +341,8 @@ public class OPTelemetryService extends Service {
} }
} }
} }
/*
private class BTTelemetryThread extends Thread implements TelemTask { private class BTTelemetryThread extends Thread implements TelemTask {
private final UAVObjectManager objMngr; private final UAVObjectManager objMngr;
@ -401,7 +397,7 @@ public class OPTelemetryService extends Service {
@Override @Override
public void update(Observable arg0, Object arg1) { public void update(Observable arg0, Object arg1) {
if (DEBUG) Log.d(TAG, "Mon updated. Connected: " + mon.getConnected() + " objects updated: " + mon.getObjectsUpdated()); if (DEBUG) Log.d(TAG, "Mon updated. Connected: " + mon.getConnected() + " objects updated: " + mon.getObjectsUpdated());
if(mon.getConnected() /*&& mon.getObjectsUpdated()*/) { if(mon.getConnected() ) {
Intent intent = new Intent(); Intent intent = new Intent();
intent.setAction(INTENT_ACTION_CONNECTED); intent.setAction(INTENT_ACTION_CONNECTED);
sendBroadcast(intent,null); sendBroadcast(intent,null);
@ -423,6 +419,5 @@ public class OPTelemetryService extends Service {
} }
if (DEBUG) Log.d(TAG, "UAVTalk stream disconnected"); if (DEBUG) Log.d(TAG, "UAVTalk stream disconnected");
} }
};*/
};
} }

View File

@ -105,8 +105,8 @@ public abstract class TelemetryTask implements Runnable {
// Create the required telemetry objects attached to this // Create the required telemetry objects attached to this
// data stream // data stream
uavTalk = new UAVTalk(inStream, outStream, objMngr); uavTalk = new UAVTalk(inStream, outStream, objMngr);
tel = new Telemetry(uavTalk, objMngr); tel = new Telemetry(uavTalk, objMngr, Looper.myLooper());
mon = new TelemetryMonitor(objMngr,tel); mon = new TelemetryMonitor(objMngr,tel, telemService);
// Create an observer to notify system of connection // Create an observer to notify system of connection
mon.addObserver(connectionObserver); mon.addObserver(connectionObserver);

View File

@ -35,166 +35,156 @@ import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import junit.framework.Assert;
import android.os.Handler;
import android.os.Looper;
import android.util.Log; import android.util.Log;
public class Telemetry { public class Telemetry {
/**
* Telemetry provides a messaging handler to handle all the object updates
* and transfer requests. This handler can either be attached to a new loop
* attached to the thread started by the telemetry service.
*/
private final String TAG = "Telemetry"; private final String TAG = "Telemetry";
public static int LOGLEVEL = 1; public static int LOGLEVEL = 0;
public static boolean WARN = LOGLEVEL > 2; public static boolean DEBUG = LOGLEVEL > 2;
public static boolean DEBUG = LOGLEVEL > 1; public static boolean WARN = LOGLEVEL > 1;
public static boolean ERROR = LOGLEVEL > 0; public static boolean ERROR = LOGLEVEL > 0;
public class TelemetryStats {
public int txBytes;
public int rxBytes;
public int txObjectBytes;
public int rxObjectBytes;
public int rxObjects;
public int txObjects;
public int txErrors;
public int rxErrors;
public int txRetries;
} ;
class ObjectTimeInfo { public class TelemetryStats {
UAVObject obj; public int txBytes;
int updatePeriodMs; /** Update period in ms or 0 if no periodic updates are needed */ public int rxBytes;
int timeToNextUpdateMs; /** Time delay to the next update */ public int txObjectBytes;
}; public int rxObjectBytes;
public int rxObjects;
public int txObjects;
public int txErrors;
public int rxErrors;
public int txRetries;
};
class ObjectQueueInfo { class ObjectTimeInfo {
UAVObject obj; UAVObject obj;
int event; int updatePeriodMs;
boolean allInstances; /** Update period in ms or 0 if no periodic updates are needed */
int timeToNextUpdateMs;
/** Time delay to the next update */
};
@Override class ObjectQueueInfo {
UAVObject obj;
int event;
boolean allInstances;
@Override
public boolean equals(Object e) { public boolean equals(Object e) {
try { try {
ObjectQueueInfo o = (ObjectQueueInfo) e; ObjectQueueInfo o = (ObjectQueueInfo) e;
return o.obj.getObjID() == obj.getObjID() && o.event == event && o.allInstances == allInstances; return o.obj.getObjID() == obj.getObjID() && o.event == event
} catch (Exception err) { && o.allInstances == allInstances;
} catch (Exception err) {
};
return false;
}
};
class ObjectTransactionInfo {
UAVObject obj;
boolean allInstances;
boolean objRequest;
int retriesRemaining;
boolean acked;
} ;
/**
* Events generated by objects. Not enum because used in mask.
*/
private static final int EV_UNPACKED = 0x01; /** Object data updated by unpacking */
private static final int EV_UPDATED = 0x02; /** Object data updated by changing the data structure */
private static final int EV_UPDATED_MANUAL = 0x04; /** Object update event manually generated */
private static final int EV_UPDATE_REQ = 0x08; /** Request to update object data */
/**
* Constructor
*/
public Telemetry(UAVTalk utalkIn, UAVObjectManager objMngr)
{
this.utalk = utalkIn;
this.objMngr = objMngr;
// Process all objects in the list
List< List<UAVObject> > objs = objMngr.getObjects();
ListIterator<List<UAVObject>> li = objs.listIterator();
while(li.hasNext())
registerObject(li.next().get(0)); // we only need to register one instance per object type
// Listen to new object creations
objMngr.addNewInstanceObserver(new Observer() {
@Override
public void update(Observable observable, Object data) {
newInstance((UAVObject) data);
} }
}); ;
objMngr.addNewObjectObserver(new Observer() { return false;
}
};
class ObjectTransactionInfo {
UAVObject obj;
boolean allInstances;
boolean objRequest;
int retriesRemaining;
boolean acked;
};
/**
* Events generated by objects. Not enum because used in mask.
*/
private static final int EV_UNPACKED = 0x01;
/** Object data updated by unpacking */
private static final int EV_UPDATED = 0x02;
/** Object data updated by changing the data structure */
private static final int EV_UPDATED_MANUAL = 0x04;
/** Object update event manually generated */
private static final int EV_UPDATE_REQ = 0x08;
/** Request to update object data */
/**
* Constructor
*/
public Telemetry(UAVTalk utalkIn, UAVObjectManager objMngr, Looper l) {
this.utalk = utalkIn;
this.objMngr = objMngr;
// Create a handler for object messages
handler = new ObjectUpdateHandler(l);
// Process all objects in the list
List<List<UAVObject>> objs = objMngr.getObjects();
ListIterator<List<UAVObject>> li = objs.listIterator();
while (li.hasNext())
registerObject(li.next().get(0)); // we only need to register one
// instance per object type
// Listen to new object creations
objMngr.addNewInstanceObserver(new Observer() {
@Override @Override
public void update(Observable observable, Object data) { public void update(Observable observable, Object data) {
newObject((UAVObject) data); newInstance((UAVObject) data);
} }
}); });
objMngr.addNewObjectObserver(new Observer() {
@Override
public void update(Observable observable, Object data) {
newObject((UAVObject) data);
}
});
// Listen to transaction completions from uavtalk // Listen to transaction completions from uavtalk
utalk.setOnTransactionCompletedListener( utalk.setOnTransactionCompletedListener(utalk.new OnTransactionCompletedListener() {
utalk.new OnTransactionCompletedListener() {
@Override @Override
void TransactionSucceeded(UAVObject data) { void TransactionSucceeded(UAVObject data) {
try { transactionCompleted(data, true);
transactionCompleted(data, true); }
} catch (IOException e) {
// Disconnect when stream fails
utalk.setOnTransactionCompletedListener(null);
}
}
@Override @Override
void TransactionFailed(UAVObject data) { void TransactionFailed(UAVObject data) {
try { if (DEBUG)
if (DEBUG) Log.d(TAG, "TransactionFailed(" + data.getName() + ")"); Log.d(TAG, "TransactionFailed(" + data.getName() + ")");
transactionCompleted(data, false); transactionCompleted(data, false);
} catch (IOException e) {
// Disconnect when stream fails
utalk.setOnTransactionCompletedListener(null);
}
}
});
// Get GCS stats object
gcsStatsObj = objMngr.getObject("GCSTelemetryStats");
// Setup transaction timer
transPending = false;
// Setup and start the periodic timer
timeToNextUpdateMs = 0;
updateTimerSetPeriod(1000);
// Setup and start the stats timer
txErrors = 0;
txRetries = 0;
}
synchronized void transTimerSetPeriod(int periodMs) {
if(transTimerTask != null)
transTimerTask.cancel();
if(transTimer != null)
transTimer.purge();
transTimer = new Timer();
transTimerTask = new TimerTask() {
@Override
public void run() {
try {
transactionTimeout();
} catch (IOException e) {
cancel();
}
} }
};
transTimer.schedule(transTimerTask, periodMs, periodMs);
}
synchronized void updateTimerSetPeriod(int periodMs) { });
if (updateTimer != null) {
updateTimer.cancel(); // Get GCS stats object
updateTimer = null; gcsStatsObj = objMngr.getObject("GCSTelemetryStats");
}
if (updateTimerTask != null) { // Setup transaction timer
updateTimerTask.cancel(); transPending = false;
updateTimerTask = null; // Setup and start the periodic timer
} timeToNextUpdateMs = 0;
updateTimer = new Timer(); updateTimerSetPeriod(1000);
updateTimerTask = new TimerTask() { // Setup and start the stats timer
txErrors = 0;
txRetries = 0;
}
synchronized void updateTimerSetPeriod(int periodMs) {
if (updateTimer != null) {
updateTimer.cancel();
updateTimer = null;
}
if (updateTimerTask != null) {
updateTimerTask.cancel();
updateTimerTask = null;
}
updateTimer = new Timer();
updateTimerTask = new TimerTask() {
@Override @Override
public void run() { public void run() {
try { try {
@ -204,574 +194,561 @@ public class Telemetry {
updateTimer.cancel(); updateTimer.cancel();
} }
} }
}; };
updateTimer.schedule(updateTimerTask, periodMs, periodMs); updateTimer.schedule(updateTimerTask, periodMs, periodMs);
} }
/** /**
* Register a new object for periodic updates (if enabled) * Register a new object for periodic updates (if enabled)
*/ */
private synchronized void registerObject(UAVObject obj) private synchronized void registerObject(UAVObject obj) {
{ // Setup object for periodic updates
// Setup object for periodic updates addObject(obj);
addObject(obj);
// Setup object for telemetry updates // Setup object for telemetry updates
updateObject(obj); updateObject(obj);
} }
/** /**
* Add an object in the list used for periodic updates * Add an object in the list used for periodic updates
*/ */
private synchronized void addObject(UAVObject obj) private synchronized void addObject(UAVObject obj) {
{ // Check if object type is already in the list
// Check if object type is already in the list ListIterator<ObjectTimeInfo> li = objList.listIterator();
ListIterator<ObjectTimeInfo> li = objList.listIterator(); while (li.hasNext()) {
while(li.hasNext()) { ObjectTimeInfo n = li.next();
ObjectTimeInfo n = li.next(); if (n.obj.getObjID() == obj.getObjID()) {
if( n.obj.getObjID() == obj.getObjID() ) // Object type (not instance!) is already in the list, do
{ // nothing
// Object type (not instance!) is already in the list, do nothing return;
return; }
} }
}
// If this point is reached, then the object type is new, let's add it // If this point is reached, then the object type is new, let's add it
ObjectTimeInfo timeInfo = new ObjectTimeInfo(); ObjectTimeInfo timeInfo = new ObjectTimeInfo();
timeInfo.obj = obj; timeInfo.obj = obj;
timeInfo.timeToNextUpdateMs = 0; timeInfo.timeToNextUpdateMs = 0;
timeInfo.updatePeriodMs = 0; timeInfo.updatePeriodMs = 0;
objList.add(timeInfo); objList.add(timeInfo);
} }
/** /**
* Update the object's timers * Update the object's timers
*/ */
private synchronized void setUpdatePeriod(UAVObject obj, int periodMs) private synchronized void setUpdatePeriod(UAVObject obj, int periodMs) {
{ // Find object type (not instance!) and update its period
// Find object type (not instance!) and update its period ListIterator<ObjectTimeInfo> li = objList.listIterator();
ListIterator<ObjectTimeInfo> li = objList.listIterator(); while (li.hasNext()) {
while(li.hasNext()) { ObjectTimeInfo n = li.next();
ObjectTimeInfo n = li.next(); if (n.obj.getObjID() == obj.getObjID()) {
if ( n.obj.getObjID() == obj.getObjID() ) n.updatePeriodMs = periodMs;
{ n.timeToNextUpdateMs = (int) (periodMs * (new java.util.Random())
n.updatePeriodMs = periodMs; .nextDouble()); // avoid bunching of updates
n.timeToNextUpdateMs = (int) (periodMs * (new java.util.Random()).nextDouble()); // avoid bunching of updates }
} }
} }
}
final Observer unpackedObserver = new Observer() { final Observer unpackedObserver = new Observer() {
@Override @Override
public void update(Observable observable, Object data) { public void update(Observable observable, Object data) {
try { handler.unpacked((UAVObject) data);
enqueueObjectUpdates((UAVObject) data, EV_UNPACKED, false, true); }
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}; };
final Observer updatedAutoObserver = new Observer() { final Observer updatedAutoObserver = new Observer() {
@Override @Override
public void update(Observable observable, Object data) { public void update(Observable observable, Object data) {
try { handler.updatedAuto((UAVObject) data);
enqueueObjectUpdates((UAVObject) data, EV_UPDATED, false, true); }
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}; };
final Observer updatedManualObserver = new Observer() { final Observer updatedManualObserver = new Observer() {
@Override @Override
public void update(Observable observable, Object data) { public void update(Observable observable, Object data) {
try { handler.updatedManual((UAVObject) data);
enqueueObjectUpdates((UAVObject) data, EV_UPDATED_MANUAL, false, true); }
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}; };
final Observer updatedRequestedObserver = new Observer() { final Observer updatedRequestedObserver = new Observer() {
@Override @Override
public void update(Observable observable, Object data) { public void update(Observable observable, Object data) {
try { handler.updateRequested((UAVObject) data);
enqueueObjectUpdates((UAVObject) data, EV_UPDATE_REQ, false, true); }
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}; };
/** /**
* Connect to all instances of an object depending on the event mask specified * Connect to all instances of an object depending on the event mask
*/ * specified
private synchronized void connectToObjectInstances(UAVObject obj, int eventMask) */
{ private synchronized void connectToObjectInstances(UAVObject obj,
List<UAVObject> objs = objMngr.getObjectInstances(obj.getObjID()); int eventMask) {
ListIterator<UAVObject> li = objs.listIterator(); List<UAVObject> objs = objMngr.getObjectInstances(obj.getObjID());
while(li.hasNext()) ListIterator<UAVObject> li = objs.listIterator();
{ while (li.hasNext()) {
obj = li.next(); obj = li.next();
// Disconnect all previous observers from telemetry. This is imortant as this can // Disconnect all previous observers from telemetry. This is
// be called multiple times // imortant as this can
obj.removeUnpackedObserver(unpackedObserver); // be called multiple times
obj.removeUpdatedAutoObserver(updatedAutoObserver); obj.removeUnpackedObserver(unpackedObserver);
obj.removeUpdatedManualObserver(updatedManualObserver); obj.removeUpdatedAutoObserver(updatedAutoObserver);
obj.removeUpdateRequestedObserver(updatedRequestedObserver); obj.removeUpdatedManualObserver(updatedManualObserver);
obj.removeUpdateRequestedObserver(updatedRequestedObserver);
// Connect only the selected events // Connect only the selected events
if ( (eventMask&EV_UNPACKED) != 0) if ((eventMask & EV_UNPACKED) != 0)
obj.addUnpackedObserver(unpackedObserver); obj.addUnpackedObserver(unpackedObserver);
if ( (eventMask&EV_UPDATED) != 0) if ((eventMask & EV_UPDATED) != 0)
obj.addUpdatedAutoObserver(updatedAutoObserver); obj.addUpdatedAutoObserver(updatedAutoObserver);
if ( (eventMask&EV_UPDATED_MANUAL) != 0) if ((eventMask & EV_UPDATED_MANUAL) != 0)
obj.addUpdatedManualObserver(updatedManualObserver); obj.addUpdatedManualObserver(updatedManualObserver);
if ( (eventMask&EV_UPDATE_REQ) != 0) if ((eventMask & EV_UPDATE_REQ) != 0)
obj.addUpdateRequestedObserver(updatedRequestedObserver); obj.addUpdateRequestedObserver(updatedRequestedObserver);
} }
} }
/** /**
* Update an object based on its metadata properties * Update an object based on its metadata properties
*/ */
private synchronized void updateObject(UAVObject obj) private void updateObject(UAVObject obj) {
{ // Get metadata
// Get metadata UAVObject.Metadata metadata = obj.getMetadata();
UAVObject.Metadata metadata = obj.getMetadata();
// Setup object depending on update mode // Setup object depending on update mode
int eventMask; int eventMask;
if ( metadata.GetGcsTelemetryUpdateMode() == UAVObject.UpdateMode.UPDATEMODE_PERIODIC ) if (metadata.GetGcsTelemetryUpdateMode() == UAVObject.UpdateMode.UPDATEMODE_PERIODIC) {
{ // Set update period
// Set update period setUpdatePeriod(obj, metadata.gcsTelemetryUpdatePeriod);
setUpdatePeriod(obj, metadata.gcsTelemetryUpdatePeriod); // Connect signals for all instances
// Connect signals for all instances eventMask = EV_UPDATED_MANUAL | EV_UPDATE_REQ;
eventMask = EV_UPDATED_MANUAL | EV_UPDATE_REQ; if (obj.isMetadata())
if(obj.isMetadata()) eventMask |= EV_UNPACKED; // we also need to act on remote
eventMask |= EV_UNPACKED; // we also need to act on remote updates (unpack events) // updates (unpack events)
connectToObjectInstances(obj, eventMask); connectToObjectInstances(obj, eventMask);
} } else if (metadata.GetGcsTelemetryUpdateMode() == UAVObject.UpdateMode.UPDATEMODE_ONCHANGE) {
else if ( metadata.GetGcsTelemetryUpdateMode() == UAVObject.UpdateMode.UPDATEMODE_ONCHANGE ) // Set update period
{ setUpdatePeriod(obj, 0);
// Set update period // Connect signals for all instances
setUpdatePeriod(obj, 0); eventMask = EV_UPDATED | EV_UPDATED_MANUAL | EV_UPDATE_REQ;
// Connect signals for all instances if (obj.isMetadata())
eventMask = EV_UPDATED | EV_UPDATED_MANUAL | EV_UPDATE_REQ; eventMask |= EV_UNPACKED; // we also need to act on remote
if(obj.isMetadata()) // updates (unpack events)
eventMask |= EV_UNPACKED; // we also need to act on remote updates (unpack events)
connectToObjectInstances(obj, eventMask); connectToObjectInstances(obj, eventMask);
} } else if (metadata.GetGcsTelemetryUpdateMode() == UAVObject.UpdateMode.UPDATEMODE_THROTTLED) {
else if ( metadata.GetGcsTelemetryUpdateMode() == UAVObject.UpdateMode.UPDATEMODE_THROTTLED ) // TODO
{ } else if (metadata.GetGcsTelemetryUpdateMode() == UAVObject.UpdateMode.UPDATEMODE_MANUAL) {
// TODO // Set update period
} setUpdatePeriod(obj, 0);
else if ( metadata.GetGcsTelemetryUpdateMode() == UAVObject.UpdateMode.UPDATEMODE_MANUAL ) // Connect signals for all instances
{ eventMask = EV_UPDATED_MANUAL | EV_UPDATE_REQ;
// Set update period if (obj.isMetadata())
setUpdatePeriod(obj, 0); eventMask |= EV_UNPACKED; // we also need to act on remote
// Connect signals for all instances // updates (unpack events)
eventMask = EV_UPDATED_MANUAL | EV_UPDATE_REQ;
if(obj.isMetadata())
eventMask |= EV_UNPACKED; // we also need to act on remote updates (unpack events)
connectToObjectInstances(obj, eventMask); connectToObjectInstances(obj, eventMask);
} }
} }
/** /**
* Called when a transaction is successfully completed (uavtalk event) * Check is any objects are pending for periodic updates TODO: Clean-up
* @throws IOException *
*/ * @throws IOException
private synchronized void transactionCompleted(UAVObject obj, boolean result) throws IOException */
{ private void processPeriodicUpdates() throws IOException {
if (DEBUG) Log.d(TAG,"UAVTalk transactionCompleted");
// Check if there is a pending transaction and the objects match
if ( transPending && transInfo.obj.getObjID() == obj.getObjID() )
{
if (DEBUG) Log.d(TAG,"Telemetry: transaction completed for " + obj.getName());
// Complete transaction
transTimer.cancel();
transPending = false;
//Send signal if (DEBUG)
obj.transactionCompleted(result); Log.d(TAG, "processPeriodicUpdates()");
// Process new object updates from queue // Stop timer
processObjectQueue();
} else
{
if (ERROR) Log.e(TAG,"Error: received a transaction completed when did not expect it.");
transPending = false;
}
}
/** updateTimer.cancel();
* Called when a transaction is not completed within the timeout period (timer event)
* @throws IOException
*/
private synchronized void transactionTimeout() throws IOException
{
if (DEBUG) Log.d(TAG,"Telemetry: transaction timeout.");
transTimer.cancel();
// Proceed only if there is a pending transaction
if ( transPending )
{
// Check if more retries are pending
if (transInfo.retriesRemaining > 0)
{
--transInfo.retriesRemaining;
processObjectTransaction();
++txRetries;
}
else
{
if (ERROR) Log.e(TAG, "Transaction failed for: " + transInfo.obj.getName());
// Terminate transaction. This triggers UAVTalk to send a transaction // Iterate through each object and update its timer, if zero then
// failed signal which will make the next queue entry be processed // transmit object.
// Note this is UAVTalk listener TransactionFailed function and not the // Also calculate smallest delay to next update (will be used for
// object specific transaction failed. // setting timeToNextUpdateMs)
utalk.cancelPendingTransaction(transInfo.obj); int minDelay = MAX_UPDATE_PERIOD_MS;
++txErrors; ObjectTimeInfo objinfo;
} int elapsedMs = 0;
} long startTime;
} int offset;
ListIterator<ObjectTimeInfo> li = objList.listIterator();
while (li.hasNext()) {
objinfo = li.next();
// If object is configured for periodic updates
if (objinfo.updatePeriodMs > 0) {
objinfo.timeToNextUpdateMs -= timeToNextUpdateMs;
// Check if time for the next update
if (objinfo.timeToNextUpdateMs <= 0) {
// Reset timer
offset = (-objinfo.timeToNextUpdateMs)
% objinfo.updatePeriodMs;
objinfo.timeToNextUpdateMs = objinfo.updatePeriodMs
- offset;
// Send object
startTime = System.currentTimeMillis();
/** if (DEBUG) Log.d(TAG, "Manual update: " + objinfo.obj.getName());
* Start an object transaction with UAVTalk, all information is stored in transInfo handler.updatedManual(objinfo.obj);
* @throws IOException // enqueueObjectUpdates(objinfo.obj, EV_UPDATED_MANUAL,
*/ // true, false);
private synchronized void processObjectTransaction() throws IOException elapsedMs = (int) (System.currentTimeMillis() - startTime);
{ // Update timeToNextUpdateMs with the elapsed delay of
if (transPending) // sending the object;
{ timeToNextUpdateMs += elapsedMs;
if (DEBUG) Log.d(TAG, "Process Object transaction for " + transInfo.obj.getName()); }
// Initiate transaction // Update minimum delay
if (transInfo.objRequest) if (objinfo.timeToNextUpdateMs < minDelay) {
{ minDelay = objinfo.timeToNextUpdateMs;
utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances); }
} }
else }
{
utalk.sendObject(transInfo.obj, transInfo.acked, transInfo.allInstances);
}
// Start timer if a response is expected
if ( transInfo.objRequest || transInfo.acked )
{
transTimerSetPeriod(REQ_TIMEOUT_MS);
}
else
{
transTimer.cancel();
transPending = false;
}
} else
{
if (ERROR) Log.e(TAG,"Error: inside of processObjectTransaction with no transPending");
}
}
/** // Check if delay for the next update is too short
* Enqueue the event received from an object. This is the main method that handles all the callbacks if (minDelay < MIN_UPDATE_PERIOD_MS) {
* from UAVObjects (due to updates, or update requests) minDelay = MIN_UPDATE_PERIOD_MS;
*/ }
private void enqueueObjectUpdates(UAVObject obj, int event, boolean allInstances, boolean priority) throws IOException
{
// Push event into queue
if (DEBUG) Log.d(TAG, "Push event into queue for obj " + obj.getName() + " event " + event);
if(event == 8 && obj.getName().compareTo("GCSTelemetryStats") == 0)
Thread.dumpStack();
ObjectQueueInfo objInfo = new ObjectQueueInfo();
objInfo.obj = obj;
objInfo.event = event;
objInfo.allInstances = allInstances;
if (priority)
{
// Only enqueue if an identical transaction does not already exist
if(!objPriorityQueue.contains(objInfo)) {
if ( objPriorityQueue.size() < MAX_QUEUE_SIZE )
{
objPriorityQueue.add(objInfo);
}
else
{
++txErrors;
obj.transactionCompleted(false);
Log.w(TAG,"Telemetry: priority event queue is full, event lost " + obj.getName());
}
}
}
else
{
// Only enqueue if an identical transaction does not already exist
if(!objQueue.contains(objInfo)) {
if ( objQueue.size() < MAX_QUEUE_SIZE )
{
objQueue.add(objInfo);
}
else
{
++txErrors;
obj.transactionCompleted(false);
}
}
}
// If there is no transaction in progress then process event // Done
if (!transPending) timeToNextUpdateMs = minDelay;
{
processObjectQueue();
}
}
/** // Restart timer
* Process events from the object queue updateTimerSetPeriod(timeToNextUpdateMs);
* @throws IOException }
*/
private void processObjectQueue() throws IOException
{
if (DEBUG) Log.d(TAG, "Process object queue - Depth " + objQueue.size() + " priority " + objPriorityQueue.size());
// Don nothing if a transaction is already in progress (should not happen) public TelemetryStats getStats() {
if (transPending) // Get UAVTalk stats
{ UAVTalk.ComStats utalkStats = utalk.getStats();
if (WARN) Log.e(TAG,"Dequeue while a transaction pending");
return;
}
// Get object information from queue (first the priority and then the regular queue) // Update stats
ObjectQueueInfo objInfo; TelemetryStats stats = new TelemetryStats();
synchronized (objPriorityQueue) { stats.txBytes = utalkStats.txBytes;
if ( !objPriorityQueue.isEmpty() ) stats.rxBytes = utalkStats.rxBytes;
{ stats.txObjectBytes = utalkStats.txObjectBytes;
objInfo = objPriorityQueue.remove(); stats.rxObjectBytes = utalkStats.rxObjectBytes;
} else { stats.rxObjects = utalkStats.rxObjects;
synchronized (objQueue) { stats.txObjects = utalkStats.txObjects;
if ( !objQueue.isEmpty() ) stats.txErrors = utalkStats.txErrors + txErrors;
{ stats.rxErrors = utalkStats.rxErrors;
objInfo = objQueue.remove(); stats.txRetries = txRetries;
}
else
{
return;
}
}
}
}
// Check if a connection has been established, only process GCSTelemetryStats updates // Done
// (used to establish the connection) return stats;
gcsStatsObj = objMngr.getObject("GCSTelemetryStats"); }
if ( ((String) gcsStatsObj.getField("Status").getValue()).compareTo("Connected") != 0 )
{
objQueue.clear();
if ( objInfo.obj.getObjID() != objMngr.getObject("GCSTelemetryStats").getObjID() )
{
if (DEBUG) Log.d(TAG,"transactionCompleted(false) due to receiving object not GCSTelemetryStats while not connected.");
objInfo.obj.transactionCompleted(false);
return;
}
}
// Setup transaction (skip if unpack event) public void resetStats() {
if ( objInfo.event != EV_UNPACKED ) utalk.resetStats();
{ txErrors = 0;
UAVObject.Metadata metadata = objInfo.obj.getMetadata(); txRetries = 0;
transInfo.obj = objInfo.obj; }
transInfo.allInstances = objInfo.allInstances;
transInfo.retriesRemaining = MAX_RETRIES;
transInfo.acked = metadata.GetGcsTelemetryAcked();
if ( objInfo.event == EV_UPDATED || objInfo.event == EV_UPDATED_MANUAL )
{
transInfo.objRequest = false;
}
else if ( objInfo.event == EV_UPDATE_REQ )
{
transInfo.objRequest = true;
}
// Start transaction
transPending = true;
processObjectTransaction();
} else
{
// qDebug() << QString("Process object queue: this is an unpack event for %1").arg(objInfo.obj->getName());
}
// If this is a metaobject then make necessary telemetry updates private void newObject(UAVObject obj) {
if (objInfo.obj.isMetadata()) registerObject(obj);
{ }
UAVMetaObject metaobj = (UAVMetaObject) objInfo.obj;
updateObject( metaobj.getParentObject() );
}
// The fact we received an unpacked event does not mean that private synchronized void newInstance(UAVObject obj) {
// we do not have additional objects still in the queue, registerObject(obj);
// so we have to reschedule queue processing to make sure they are not }
// stuck:
if ( objInfo.event == EV_UNPACKED && !transPending)
processObjectQueue();
} /**
* Stop all the telemetry timers
/** */
* Check is any objects are pending for periodic updates public void stopTelemetry() {
* TODO: Clean-up if (updateTimerTask != null)
* @throws IOException updateTimerTask.cancel();
*/ updateTimerTask = null;
private synchronized void processPeriodicUpdates() throws IOException if (updateTimer != null)
{ updateTimer.cancel();
updateTimer = null;
if (DEBUG) Log.d(TAG, "processPeriodicUpdates()"); }
// Stop timer
updateTimer.cancel();
// Iterate through each object and update its timer, if zero then transmit object.
// Also calculate smallest delay to next update (will be used for setting timeToNextUpdateMs)
int minDelay = MAX_UPDATE_PERIOD_MS;
ObjectTimeInfo objinfo;
int elapsedMs = 0;
long startTime;
int offset;
ListIterator<ObjectTimeInfo> li = objList.listIterator();
while(li.hasNext())
{
objinfo = li.next();
// If object is configured for periodic updates
if (objinfo.updatePeriodMs > 0)
{
objinfo.timeToNextUpdateMs -= timeToNextUpdateMs;
// Check if time for the next update
if (objinfo.timeToNextUpdateMs <= 0)
{
// Reset timer
offset = (-objinfo.timeToNextUpdateMs) % objinfo.updatePeriodMs;
objinfo.timeToNextUpdateMs = objinfo.updatePeriodMs - offset;
// Send object
startTime = System.currentTimeMillis();
enqueueObjectUpdates(objinfo.obj, EV_UPDATED_MANUAL, true, false);
elapsedMs = (int) (System.currentTimeMillis() - startTime);
// Update timeToNextUpdateMs with the elapsed delay of sending the object;
timeToNextUpdateMs += elapsedMs;
}
// Update minimum delay
if (objinfo.timeToNextUpdateMs < minDelay)
{
minDelay = objinfo.timeToNextUpdateMs;
}
}
}
// Check if delay for the next update is too short
if (minDelay < MIN_UPDATE_PERIOD_MS)
{
minDelay = MIN_UPDATE_PERIOD_MS;
}
// Done
timeToNextUpdateMs = minDelay;
// Restart timer
updateTimerSetPeriod(timeToNextUpdateMs);
}
public TelemetryStats getStats()
{
// Get UAVTalk stats
UAVTalk.ComStats utalkStats = utalk.getStats();
// Update stats
TelemetryStats stats = new TelemetryStats();
stats.txBytes = utalkStats.txBytes;
stats.rxBytes = utalkStats.rxBytes;
stats.txObjectBytes = utalkStats.txObjectBytes;
stats.rxObjectBytes = utalkStats.rxObjectBytes;
stats.rxObjects = utalkStats.rxObjects;
stats.txObjects = utalkStats.txObjects;
stats.txErrors = utalkStats.txErrors + txErrors;
stats.rxErrors = utalkStats.rxErrors;
stats.txRetries = txRetries;
// Done
return stats;
}
public synchronized void resetStats()
{
utalk.resetStats();
txErrors = 0;
txRetries = 0;
}
private void newObject(UAVObject obj)
{
registerObject(obj);
}
private synchronized void newInstance(UAVObject obj)
{
registerObject(obj);
}
/**
* Stop all the telemetry timers
*/
public void stopTelemetry()
{
if (updateTimerTask != null)
updateTimerTask.cancel();
updateTimerTask = null;
if (updateTimer != null)
updateTimer.cancel();
updateTimer = null;
if (transTimerTask != null)
transTimerTask.cancel();
transTimerTask = null;
if (transTimer != null)
transTimer.cancel();
transTimer = null;
}
/** /**
* Private variables * Private variables
*/ */
private final UAVObjectManager objMngr; private final UAVObjectManager objMngr;
private final UAVTalk utalk; private final UAVTalk utalk;
private UAVObject gcsStatsObj; private UAVObject gcsStatsObj;
private final List<ObjectTimeInfo> objList = new ArrayList<ObjectTimeInfo>(); private final List<ObjectTimeInfo> objList = new ArrayList<ObjectTimeInfo>();
private final Queue<ObjectQueueInfo> objQueue = new ConcurrentLinkedQueue<ObjectQueueInfo>(); private ObjectTransactionInfo transInfo = new ObjectTransactionInfo();
private final Queue<ObjectQueueInfo> objPriorityQueue = new ConcurrentLinkedQueue<ObjectQueueInfo>(); private boolean transPending;
private final ObjectTransactionInfo transInfo = new ObjectTransactionInfo();
private boolean transPending;
private Timer updateTimer; private Timer updateTimer;
private TimerTask updateTimerTask; private TimerTask updateTimerTask;
private Timer transTimer;
private TimerTask transTimerTask;
private int timeToNextUpdateMs; private int timeToNextUpdateMs;
private int txErrors; private int txErrors;
private int txRetries; private int txRetries;
/** /**
* Private constants * Private constants
*/ */
private static final int REQ_TIMEOUT_MS = 250; private static final int REQ_TIMEOUT_MS = 250;
private static final int MAX_RETRIES = 2; private static final int MAX_RETRIES = 2;
private static final int MAX_UPDATE_PERIOD_MS = 1000; private static final int MAX_UPDATE_PERIOD_MS = 1000;
private static final int MIN_UPDATE_PERIOD_MS = 1; private static final int MIN_UPDATE_PERIOD_MS = 1;
private static final int MAX_QUEUE_SIZE = 20;
static private ObjectUpdateHandler handler;
//! Accessor for the object updated handler
ObjectUpdateHandler getHandler() { return handler; }
/**
* Handler which posts all the messages for individual object updates
*/
public class ObjectUpdateHandler extends Handler {
// ! This can only be created while attaching to a particular looper
ObjectUpdateHandler(Looper l) {
super(l);
}
Queue<ObjectQueueInfo> objQueue = new ConcurrentLinkedQueue<ObjectQueueInfo>();
// ! Generic enqueue
void enqueueObjectUpdates(UAVObject obj, int event,
boolean allInstances, boolean priority) {
if (DEBUG) Log.d(TAG, "Enqueing update " + obj.getName() + " event " + event);
ObjectQueueInfo objInfo = new ObjectQueueInfo();
objInfo.obj = obj;
objInfo.event = event;
objInfo.allInstances = allInstances;
// For now maintain a list of objects in the queue so we don't add duplicates
// later we should make the runnables static to each class so we can use removeCallback
synchronized(objQueue) {
if (objQueue.contains(objInfo)) {
if (WARN) Log.w(TAG, "Found previously scheduled queue element: " + objInfo.obj.getName());
} else {
objQueue.add(objInfo);
post(new ObjectRunnable(objInfo));
}
}
}
public boolean removeActivatedQueue(ObjectQueueInfo objInfo) {
synchronized(objQueue) {
if (objQueue.remove(objInfo)) {
if (WARN) Log.w(TAG, "Unable to find queue element to remove");
return false;
}
}
return true;
}
// ! Enqueue an unpacked event
void unpacked(UAVObject obj) {
enqueueObjectUpdates(obj, EV_UNPACKED, false, true);
}
// ! Enqueue an updated auto event
void updatedAuto(UAVObject obj) {
enqueueObjectUpdates(obj, EV_UPDATED, false, true);
}
// ! Enqueue an updated manual event
void updatedManual(UAVObject obj) {
enqueueObjectUpdates(obj, EV_UPDATED_MANUAL, false, true);
}
// ! Enqueue an update requested event
void updateRequested(UAVObject obj) {
enqueueObjectUpdates(obj, EV_UPDATE_REQ, false, true);
}
}
/**
* Perform an update on an object where on an event based on the contents provided
* to the constructors. This update will also set a timeout for transaction failure.
*/
class ObjectRunnable implements Runnable {
// ! Transaction information to perform
private final ObjectQueueInfo objInfo;
ObjectRunnable(ObjectQueueInfo info) {
Assert.assertNotNull(info);
objInfo = info;
}
// ! Perform the transaction on the looper thread
@Override
public void run() {
if (DEBUG) Log.d(TAG, "Object transaction running. Event:" + objInfo.event);
// 1. Check GCS is connected, throw this out if not
// 2. Set up a transaction which includes multiple retries, whether
// to wait for ack etc
// 3. Send UAVTalk message
// 4. Based on transaction type either wait for update or end
// 1. Check if a connection has been established, only process
// GCSTelemetryStats updates
// (used to establish the connection)
gcsStatsObj = objMngr.getObject("GCSTelemetryStats");
if (((String) gcsStatsObj.getField("Status").getValue()).compareTo("Connected") != 0) {
if (objInfo.obj.getObjID() != objMngr.getObject("GCSTelemetryStats").getObjID()) {
if (DEBUG)
Log.d(TAG, "transactionCompleted(false) due to receiving object not GCSTelemetryStats while not connected.");
objInfo.obj.transactionCompleted(false);
return;
}
}
// If this is a metaobject then make necessary telemetry updates
// (this is why we catch unpack)
if (objInfo.obj.isMetadata()) {
UAVMetaObject metaobj = (UAVMetaObject) objInfo.obj;
updateObject(metaobj.getParentObject());
}
// 2. Setup transaction (skip if unpack event)
ObjectTransactionInfo newTrans = new ObjectTransactionInfo();
boolean newTransactionPending = false;
if (objInfo.event != EV_UNPACKED) {
UAVObject.Metadata metadata = objInfo.obj.getMetadata();
newTrans.obj = objInfo.obj;
newTrans.allInstances = objInfo.allInstances;
newTrans.retriesRemaining = MAX_RETRIES;
newTrans.acked = metadata.GetGcsTelemetryAcked();
if (objInfo.event == EV_UPDATED || objInfo.event == EV_UPDATED_MANUAL) {
newTrans.objRequest = false;
} else if (objInfo.event == EV_UPDATE_REQ) {
newTrans.objRequest = true;
}
// Determine if this will schedule a new transaction
newTransactionPending = (newTrans.objRequest || newTrans.acked);
synchronized (transInfo) {
// If there is a transaction pending and this would set up a new one reschedule it
if (transPending && newTransactionPending) {
if (WARN) Log.w(TAG, "Postponing transaction for" + newTrans.obj.getName() + " existing transaction for " + transInfo.obj.getName());
handler.postDelayed(this, 100);
return;
}
// Store this as the active transaction
transPending = newTransactionPending;
transInfo = newTrans;
if (DEBUG) Log.d(TAG, "Process Object transaction for " + transInfo.obj.getName());
// Remove this one from the list of pending transactions
handler.removeActivatedQueue(objInfo);
try {
// 3. Execute transaction by sending the appropriate UAVTalk command
if (transInfo.objRequest) {
if (DEBUG) Log.d(TAG, "Sending object request " + transInfo.obj.getName());
utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances);
} else {
if (DEBUG) Log.d(TAG, "Sending object " + transInfo.obj.getName());
utalk.sendObject(transInfo.obj, transInfo.acked, transInfo.allInstances);
}
} catch (IOException e) {
if (ERROR) Log.e(TAG, "Unable to send UAVTalk message");
e.printStackTrace();
}
// Post a timeout timer if a response is epxected
if (transPending)
handler.postDelayed(transactionTimeout, REQ_TIMEOUT_MS);
}
}
}
}
/**
* Runnable posted to handle a timeout of a transaction. Tracks the number of retry attempts
* retries that many, and finally sends a transaction failed signal.
*/
final Runnable transactionTimeout = new Runnable() {
@Override
public void run() {
// Lock on the transaction
synchronized (transInfo) {
// Proceed only if there is a pending transaction
if (!transPending) {
if (WARN) Log.w(TAG,"Transaction completed but timeout still called. Probable race condition");
return;
}
if (DEBUG) Log.d(TAG, "Telemetry: transaction timeout.");
// Check if more retries are pending
if (transInfo.retriesRemaining > 0) {
--transInfo.retriesRemaining;
// Repeat whatever is required for this transaction type
// (transInfo.objRequest) {
if (DEBUG) Log.d(TAG, "Sending object request");
try {
// Execute transaction by sending the appropriate UAVTalk command
if (transInfo.objRequest) {
if (DEBUG) Log.d(TAG, "Sending object request" + transInfo.obj.getName());
utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances);
} else {
if (DEBUG) Log.d(TAG, "Sending object " + transInfo.obj.getName());
utalk.sendObject(transInfo.obj, transInfo.acked, transInfo.allInstances);
}
} catch (IOException e) {
if (ERROR) Log.e(TAG, "Unable to send UAVTalk message");
e.printStackTrace();
}
handler.postDelayed(transactionTimeout, REQ_TIMEOUT_MS);
++txRetries;
} else {
if (ERROR) Log.e(TAG, "Transaction failed for: " + transInfo.obj.getName());
// Terminate transaction. This triggers UAVTalk to send a transaction
// failed signal which will make the next queue entry be processed
// Note this is UAVTalk listener TransactionFailed function
// object specific transaction failed.
utalk.cancelPendingTransaction(transInfo.obj);
++txErrors;
}
}
}
};
/**
* Called when a transaction is successfully completed (UAVTalk event) and maps that to
* the appropriate object event as well as canceling the pending transaction and timeout
*/
private void transactionCompleted(UAVObject obj, boolean result) {
if (DEBUG) Log.d(TAG, "UAVTalk transactionCompleted");
// Check if there is a pending transaction and the objects match
synchronized(transInfo) {
if (transPending && transInfo.obj.getObjID() == obj.getObjID()) {
if (DEBUG) Log.d(TAG, "Telemetry: transaction completed for " + obj.getName());
// Cancel timeout and complete transaction
handler.removeCallbacks(transactionTimeout);
transPending = false;
//Send signal
obj.transactionCompleted(result);
} else {
if (ERROR) Log.e(TAG, "Error: received a transaction completed when did not expect it.");
transPending = false;
}
}
}
} }

View File

@ -34,6 +34,8 @@ import java.util.Observer;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import org.openpilot.androidgcs.telemetry.OPTelemetryService;
import android.util.Log; import android.util.Log;
public class TelemetryMonitor extends Observable { public class TelemetryMonitor extends Observable {
@ -60,6 +62,7 @@ public class TelemetryMonitor extends Observable {
private long lastUpdateTime; private long lastUpdateTime;
private final List<UAVObject> queue; private final List<UAVObject> queue;
private OPTelemetryService telemService;
private boolean connected = false; private boolean connected = false;
private boolean objects_updated = false; private boolean objects_updated = false;
@ -71,6 +74,11 @@ public class TelemetryMonitor extends Observable {
return objects_updated; return objects_updated;
}; };
public TelemetryMonitor(UAVObjectManager objMngr, Telemetry tel, OPTelemetryService s) {
this(objMngr, tel);
telemService = s;
}
public TelemetryMonitor(UAVObjectManager objMngr, Telemetry tel) { public TelemetryMonitor(UAVObjectManager objMngr, Telemetry tel) {
this.objMngr = objMngr; this.objMngr = objMngr;
this.tel = tel; this.tel = tel;
@ -171,7 +179,9 @@ public class TelemetryMonitor extends Observable {
public synchronized void retrieveNextObject() throws IOException { public synchronized void retrieveNextObject() throws IOException {
// If queue is empty return // If queue is empty return
if (queue.isEmpty()) { if (queue.isEmpty()) {
if (DEBUG || true) Log.d(TAG, "All objects retrieved: Connected Successfully"); if (telemService != null)
telemService.toastMessage("Connected");
if (DEBUG) Log.d(TAG, "All objects retrieved: Connected Successfully");
objects_updated = true; objects_updated = true;
if (!HANDSHAKE_IS_CONNECTED) { if (!HANDSHAKE_IS_CONNECTED) {
setChanged(); setChanged();

View File

@ -1,6 +1,6 @@
package org.openpilot.uavtalk; package org.openpilot.uavtalk;
import static org.junit.Assert.*; import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
@ -8,11 +8,12 @@ import java.net.Socket;
import org.junit.Test; import org.junit.Test;
import org.openpilot.uavtalk.uavobjects.UAVObjectsInitialize; import org.openpilot.uavtalk.uavobjects.UAVObjectsInitialize;
import org.openpilot.uavtalk.UAVTalk;
import android.os.Looper;
public class TelemetryMonitorTest { public class TelemetryMonitorTest {
static UAVObjectManager objMngr; static UAVObjectManager objMngr;
static UAVTalk talk; static UAVTalk talk;
static final String IP_ADDRDESS = new String("127.0.0.1"); static final String IP_ADDRDESS = new String("127.0.0.1");
@ -32,7 +33,7 @@ public class TelemetryMonitorTest {
e.printStackTrace(); e.printStackTrace();
fail("Couldn't connect to test platform"); fail("Couldn't connect to test platform");
} }
try { try {
talk = new UAVTalk(connection.getInputStream(), connection.getOutputStream(), objMngr); talk = new UAVTalk(connection.getInputStream(), connection.getOutputStream(), objMngr);
} catch (IOException e) { } catch (IOException e) {
@ -40,16 +41,17 @@ public class TelemetryMonitorTest {
e.printStackTrace(); e.printStackTrace();
fail("Couldn't construct UAVTalk object"); fail("Couldn't construct UAVTalk object");
} }
Thread inputStream = talk.getInputProcessThread(); Thread inputStream = talk.getInputProcessThread();
inputStream.start(); inputStream.start();
Telemetry tel = new Telemetry(talk, objMngr); Looper.prepare();
Telemetry tel = new Telemetry(talk, objMngr, Looper.myLooper());
@SuppressWarnings("unused") @SuppressWarnings("unused")
TelemetryMonitor mon = new TelemetryMonitor(objMngr,tel); TelemetryMonitor mon = new TelemetryMonitor(objMngr,tel);
Thread.sleep(10000); Thread.sleep(10000);
System.out.println("Done"); System.out.println("Done");
} }