mirror of
https://bitbucket.org/librepilot/librepilot.git
synced 2024-11-30 08:24:11 +01:00
AndroidGCS: Start moving the telemetry object queue to a handler and a looper
This commit is contained in:
parent
2f65952952
commit
2d7bb4d3bb
@ -26,16 +26,10 @@
|
||||
*/
|
||||
package org.openpilot.androidgcs.telemetry;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.util.Observable;
|
||||
import java.util.Observer;
|
||||
|
||||
import org.openpilot.uavtalk.Telemetry;
|
||||
import org.openpilot.uavtalk.TelemetryMonitor;
|
||||
import org.openpilot.uavtalk.UAVDataObject;
|
||||
import org.openpilot.uavtalk.UAVObjectManager;
|
||||
import org.openpilot.uavtalk.UAVTalk;
|
||||
import org.openpilot.uavtalk.uavobjects.UAVObjectsInitialize;
|
||||
|
||||
import android.app.Service;
|
||||
@ -127,7 +121,7 @@ public class OPTelemetryService extends Service {
|
||||
break;
|
||||
case 2:
|
||||
Toast.makeText(getApplicationContext(), "Attempting BT connection", Toast.LENGTH_SHORT).show();
|
||||
activeTelem = new BTTelemetryThread();
|
||||
//activeTelem = new BTTelemetryThread();
|
||||
break;
|
||||
case 3:
|
||||
Toast.makeText(getApplicationContext(), "Attempting TCP connection", Toast.LENGTH_SHORT).show();
|
||||
@ -347,6 +341,8 @@ public class OPTelemetryService extends Service {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
private class BTTelemetryThread extends Thread implements TelemTask {
|
||||
|
||||
private final UAVObjectManager objMngr;
|
||||
@ -401,7 +397,7 @@ public class OPTelemetryService extends Service {
|
||||
@Override
|
||||
public void update(Observable arg0, Object arg1) {
|
||||
if (DEBUG) Log.d(TAG, "Mon updated. Connected: " + mon.getConnected() + " objects updated: " + mon.getObjectsUpdated());
|
||||
if(mon.getConnected() /*&& mon.getObjectsUpdated()*/) {
|
||||
if(mon.getConnected() ) {
|
||||
Intent intent = new Intent();
|
||||
intent.setAction(INTENT_ACTION_CONNECTED);
|
||||
sendBroadcast(intent,null);
|
||||
@ -423,6 +419,5 @@ public class OPTelemetryService extends Service {
|
||||
}
|
||||
if (DEBUG) Log.d(TAG, "UAVTalk stream disconnected");
|
||||
}
|
||||
|
||||
};
|
||||
};*/
|
||||
}
|
||||
|
@ -105,7 +105,7 @@ public abstract class TelemetryTask implements Runnable {
|
||||
// Create the required telemetry objects attached to this
|
||||
// data stream
|
||||
uavTalk = new UAVTalk(inStream, outStream, objMngr);
|
||||
tel = new Telemetry(uavTalk, objMngr);
|
||||
tel = new Telemetry(uavTalk, objMngr, Looper.myLooper());
|
||||
mon = new TelemetryMonitor(objMngr,tel, telemService);
|
||||
|
||||
// Create an observer to notify system of connection
|
||||
|
@ -35,9 +35,17 @@ import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
import junit.framework.Assert;
|
||||
import android.os.Handler;
|
||||
import android.os.Looper;
|
||||
import android.util.Log;
|
||||
|
||||
public class Telemetry {
|
||||
/**
|
||||
* Telemetry provides a messaging handler to handle all the object updates and transfer
|
||||
* requests. This handler can either be attached to a new loop attached to the thread
|
||||
* started by the telemetry service.
|
||||
*/
|
||||
|
||||
private final String TAG = "Telemetry";
|
||||
public static int LOGLEVEL = 1;
|
||||
@ -98,11 +106,14 @@ public class Telemetry {
|
||||
/**
|
||||
* Constructor
|
||||
*/
|
||||
public Telemetry(UAVTalk utalkIn, UAVObjectManager objMngr)
|
||||
public Telemetry(UAVTalk utalkIn, UAVObjectManager objMngr, Looper l)
|
||||
{
|
||||
this.utalk = utalkIn;
|
||||
this.objMngr = objMngr;
|
||||
|
||||
// Create a handler for object messages
|
||||
handler = new ObjectUpdateHandler(l);
|
||||
|
||||
// Process all objects in the list
|
||||
List< List<UAVObject> > objs = objMngr.getObjects();
|
||||
ListIterator<List<UAVObject>> li = objs.listIterator();
|
||||
@ -265,48 +276,28 @@ public class Telemetry {
|
||||
final Observer unpackedObserver = new Observer() {
|
||||
@Override
|
||||
public void update(Observable observable, Object data) {
|
||||
try {
|
||||
enqueueObjectUpdates((UAVObject) data, EV_UNPACKED, false, true);
|
||||
} catch (IOException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
}
|
||||
handler.unpacked((UAVObject) data);
|
||||
}
|
||||
};
|
||||
|
||||
final Observer updatedAutoObserver = new Observer() {
|
||||
@Override
|
||||
public void update(Observable observable, Object data) {
|
||||
try {
|
||||
enqueueObjectUpdates((UAVObject) data, EV_UPDATED, false, true);
|
||||
} catch (IOException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
}
|
||||
handler.updatedAuto((UAVObject) data);
|
||||
}
|
||||
};
|
||||
|
||||
final Observer updatedManualObserver = new Observer() {
|
||||
@Override
|
||||
public void update(Observable observable, Object data) {
|
||||
try {
|
||||
enqueueObjectUpdates((UAVObject) data, EV_UPDATED_MANUAL, false, true);
|
||||
} catch (IOException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
}
|
||||
handler.updatedManual((UAVObject) data);
|
||||
}
|
||||
};
|
||||
|
||||
final Observer updatedRequestedObserver = new Observer() {
|
||||
@Override
|
||||
public void update(Observable observable, Object data) {
|
||||
try {
|
||||
enqueueObjectUpdates((UAVObject) data, EV_UPDATE_REQ, false, true);
|
||||
} catch (IOException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
}
|
||||
handler.updateRequested((UAVObject) data);
|
||||
}
|
||||
};
|
||||
|
||||
@ -488,59 +479,6 @@ public class Telemetry {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Enqueue the event received from an object. This is the main method that handles all the callbacks
|
||||
* from UAVObjects (due to updates, or update requests)
|
||||
*/
|
||||
private void enqueueObjectUpdates(UAVObject obj, int event, boolean allInstances, boolean priority) throws IOException
|
||||
{
|
||||
// Push event into queue
|
||||
if (DEBUG) Log.d(TAG, "Push event into queue for obj " + obj.getName() + " event " + event);
|
||||
if(event == 8 && obj.getName().compareTo("GCSTelemetryStats") == 0)
|
||||
Thread.dumpStack();
|
||||
ObjectQueueInfo objInfo = new ObjectQueueInfo();
|
||||
objInfo.obj = obj;
|
||||
objInfo.event = event;
|
||||
objInfo.allInstances = allInstances;
|
||||
if (priority)
|
||||
{
|
||||
// Only enqueue if an identical transaction does not already exist
|
||||
if(!objPriorityQueue.contains(objInfo)) {
|
||||
if ( objPriorityQueue.size() < MAX_QUEUE_SIZE )
|
||||
{
|
||||
objPriorityQueue.add(objInfo);
|
||||
}
|
||||
else
|
||||
{
|
||||
++txErrors;
|
||||
obj.transactionCompleted(false);
|
||||
Log.w(TAG,"Telemetry: priority event queue is full, event lost " + obj.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Only enqueue if an identical transaction does not already exist
|
||||
if(!objQueue.contains(objInfo)) {
|
||||
if ( objQueue.size() < MAX_QUEUE_SIZE )
|
||||
{
|
||||
objQueue.add(objInfo);
|
||||
}
|
||||
else
|
||||
{
|
||||
++txErrors;
|
||||
obj.transactionCompleted(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If there is no transaction in progress then process event
|
||||
if (!transPending)
|
||||
{
|
||||
processObjectQueue();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process events from the object queue
|
||||
* @throws IOException
|
||||
@ -666,7 +604,8 @@ public class Telemetry {
|
||||
objinfo.timeToNextUpdateMs = objinfo.updatePeriodMs - offset;
|
||||
// Send object
|
||||
startTime = System.currentTimeMillis();
|
||||
enqueueObjectUpdates(objinfo.obj, EV_UPDATED_MANUAL, true, false);
|
||||
handler.updatedManual(objinfo.obj);
|
||||
//enqueueObjectUpdates(objinfo.obj, EV_UPDATED_MANUAL, true, false);
|
||||
elapsedMs = (int) (System.currentTimeMillis() - startTime);
|
||||
// Update timeToNextUpdateMs with the elapsed delay of sending the object;
|
||||
timeToNextUpdateMs += elapsedMs;
|
||||
@ -780,6 +719,147 @@ public class Telemetry {
|
||||
private static final int MIN_UPDATE_PERIOD_MS = 1;
|
||||
private static final int MAX_QUEUE_SIZE = 20;
|
||||
|
||||
private final ObjectUpdateHandler handler;
|
||||
|
||||
public class ObjectUpdateHandler extends Handler {
|
||||
|
||||
//! This can only be created while attaching to a particular looper
|
||||
ObjectUpdateHandler(Looper l) {
|
||||
super(l);
|
||||
}
|
||||
|
||||
//! Generic enqueue
|
||||
void enqueueObjectUpdates(UAVObject obj, int event, boolean allInstances, boolean priority) {
|
||||
|
||||
if (DEBUG) Log.d(TAG, "Enqueing update " + obj.getName() + " event " + event);
|
||||
|
||||
ObjectQueueInfo objInfo = new ObjectQueueInfo();
|
||||
objInfo.obj = obj;
|
||||
objInfo.event = event;
|
||||
objInfo.allInstances = allInstances;
|
||||
|
||||
post(new ObjectRunnable(objInfo));
|
||||
}
|
||||
|
||||
//! Enqueue an unpacked event
|
||||
void unpacked(UAVObject obj) {
|
||||
enqueueObjectUpdates(obj, EV_UNPACKED, false, true);
|
||||
}
|
||||
|
||||
//! Enqueue an updated auto event
|
||||
void updatedAuto(UAVObject obj) {
|
||||
enqueueObjectUpdates(obj,EV_UPDATED, false, true);
|
||||
}
|
||||
|
||||
//! Enqueue an updated manual event
|
||||
void updatedManual(UAVObject obj) {
|
||||
enqueueObjectUpdates(obj, EV_UPDATE_REQ, false, true);
|
||||
}
|
||||
|
||||
//! Enqueue an update requested event
|
||||
void updateRequested(UAVObject obj) {
|
||||
enqueueObjectUpdates(obj, EV_UPDATE_REQ, false, true);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class ObjectRunnable implements Runnable {
|
||||
|
||||
//! Transaction information to perform
|
||||
private final ObjectQueueInfo objInfo;
|
||||
// private final ObjectTransactionInfo transInfo = new ObjectTransactionInfo();
|
||||
|
||||
ObjectRunnable(ObjectQueueInfo info) {
|
||||
Assert.assertNotNull(info);
|
||||
objInfo = info;
|
||||
}
|
||||
|
||||
//! Perform the transaction on the looper thread
|
||||
@Override
|
||||
public void run () {
|
||||
Log.d(TAG,"object transaction running");
|
||||
// 1. Check GCS is connected, throw this out if not
|
||||
// 2. Set up a transaction which includes multiple retries, whether to wait for ack etc
|
||||
// 3. Send UAVTalk message
|
||||
// 4. Based on transaction type either wait for update or end
|
||||
|
||||
// 1. Check if a connection has been established, only process GCSTelemetryStats updates
|
||||
// (used to establish the connection)
|
||||
gcsStatsObj = objMngr.getObject("GCSTelemetryStats");
|
||||
if ( ((String) gcsStatsObj.getField("Status").getValue()).compareTo("Connected") != 0 )
|
||||
{
|
||||
if ( objInfo.obj.getObjID() != objMngr.getObject("GCSTelemetryStats").getObjID() )
|
||||
{
|
||||
if (DEBUG) Log.d(TAG,"transactionCompleted(false) due to receiving object not GCSTelemetryStats while not connected.");
|
||||
objInfo.obj.transactionCompleted(false);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
Log.e(TAG, "A");
|
||||
// 2. Setup transaction (skip if unpack event)
|
||||
if ( objInfo.event != EV_UNPACKED )
|
||||
{
|
||||
Log.e(TAG, "A1");
|
||||
UAVObject.Metadata metadata = objInfo.obj.getMetadata();
|
||||
transInfo.obj = objInfo.obj;
|
||||
transInfo.allInstances = objInfo.allInstances;
|
||||
transInfo.retriesRemaining = MAX_RETRIES;
|
||||
transInfo.acked = metadata.GetGcsTelemetryAcked();
|
||||
if ( objInfo.event == EV_UPDATED || objInfo.event == EV_UPDATED_MANUAL )
|
||||
{
|
||||
transInfo.objRequest = false;
|
||||
}
|
||||
else if ( objInfo.event == EV_UPDATE_REQ )
|
||||
{
|
||||
transInfo.objRequest = true;
|
||||
}
|
||||
// Start transaction
|
||||
transPending = true;
|
||||
}
|
||||
Log.e(TAG, "B");
|
||||
// If this is a metaobject then make necessary telemetry updates (this is why we catch unpack)
|
||||
if (objInfo.obj.isMetadata())
|
||||
{
|
||||
UAVMetaObject metaobj = (UAVMetaObject) objInfo.obj;
|
||||
updateObject( metaobj.getParentObject() );
|
||||
}
|
||||
Log.e(TAG, "C");
|
||||
// 3. Execute transaction
|
||||
if (transPending)
|
||||
{
|
||||
Log.e(TAG, "D");
|
||||
try {
|
||||
if (DEBUG || true) Log.d(TAG, "Process Object transaction for " + transInfo.obj.getName());
|
||||
// Initiate transaction
|
||||
if (transInfo.objRequest)
|
||||
{
|
||||
utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances);
|
||||
}
|
||||
else
|
||||
{
|
||||
Log.d(TAG, "Sending object");
|
||||
utalk.sendObject(transInfo.obj, transInfo.acked, transInfo.allInstances);
|
||||
}
|
||||
|
||||
// TODO: Block if request expected (??)
|
||||
if ( transInfo.objRequest || transInfo.acked )
|
||||
{
|
||||
transTimerSetPeriod(REQ_TIMEOUT_MS);
|
||||
}
|
||||
else
|
||||
{
|
||||
synchronized(transTimer) {
|
||||
transTimer.cancel();
|
||||
transPending = false;
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// TODO Auto-generated catch block
|
||||
Log.e(TAG, "E");
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
package org.openpilot.uavtalk;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
@ -8,11 +8,12 @@ import java.net.Socket;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.openpilot.uavtalk.uavobjects.UAVObjectsInitialize;
|
||||
import org.openpilot.uavtalk.UAVTalk;
|
||||
|
||||
import android.os.Looper;
|
||||
|
||||
|
||||
public class TelemetryMonitorTest {
|
||||
|
||||
|
||||
static UAVObjectManager objMngr;
|
||||
static UAVTalk talk;
|
||||
static final String IP_ADDRDESS = new String("127.0.0.1");
|
||||
@ -32,7 +33,7 @@ public class TelemetryMonitorTest {
|
||||
e.printStackTrace();
|
||||
fail("Couldn't connect to test platform");
|
||||
}
|
||||
|
||||
|
||||
try {
|
||||
talk = new UAVTalk(connection.getInputStream(), connection.getOutputStream(), objMngr);
|
||||
} catch (IOException e) {
|
||||
@ -40,16 +41,17 @@ public class TelemetryMonitorTest {
|
||||
e.printStackTrace();
|
||||
fail("Couldn't construct UAVTalk object");
|
||||
}
|
||||
|
||||
|
||||
Thread inputStream = talk.getInputProcessThread();
|
||||
inputStream.start();
|
||||
|
||||
Telemetry tel = new Telemetry(talk, objMngr);
|
||||
|
||||
Looper.prepare();
|
||||
Telemetry tel = new Telemetry(talk, objMngr, Looper.myLooper());
|
||||
@SuppressWarnings("unused")
|
||||
TelemetryMonitor mon = new TelemetryMonitor(objMngr,tel);
|
||||
|
||||
|
||||
Thread.sleep(10000);
|
||||
|
||||
|
||||
System.out.println("Done");
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user