1
0
mirror of https://bitbucket.org/librepilot/librepilot.git synced 2024-11-29 07:24:13 +01:00

AndroidGCS: Start moving the telemetry object queue to a handler and a looper

This commit is contained in:
James Cotton 2012-08-13 15:02:15 -05:00
parent d5c1e3578e
commit 319baa9e6f
4 changed files with 176 additions and 99 deletions

View File

@ -26,16 +26,10 @@
*/
package org.openpilot.androidgcs.telemetry;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.util.Observable;
import java.util.Observer;
import org.openpilot.uavtalk.Telemetry;
import org.openpilot.uavtalk.TelemetryMonitor;
import org.openpilot.uavtalk.UAVDataObject;
import org.openpilot.uavtalk.UAVObjectManager;
import org.openpilot.uavtalk.UAVTalk;
import org.openpilot.uavtalk.uavobjects.UAVObjectsInitialize;
import android.app.Service;
@ -127,7 +121,7 @@ public class OPTelemetryService extends Service {
break;
case 2:
Toast.makeText(getApplicationContext(), "Attempting BT connection", Toast.LENGTH_SHORT).show();
activeTelem = new BTTelemetryThread();
//activeTelem = new BTTelemetryThread();
break;
case 3:
Toast.makeText(getApplicationContext(), "Attempting TCP connection", Toast.LENGTH_SHORT).show();
@ -347,6 +341,8 @@ public class OPTelemetryService extends Service {
}
}
}
/*
private class BTTelemetryThread extends Thread implements TelemTask {
private final UAVObjectManager objMngr;
@ -401,7 +397,7 @@ public class OPTelemetryService extends Service {
@Override
public void update(Observable arg0, Object arg1) {
if (DEBUG) Log.d(TAG, "Mon updated. Connected: " + mon.getConnected() + " objects updated: " + mon.getObjectsUpdated());
if(mon.getConnected() /*&& mon.getObjectsUpdated()*/) {
if(mon.getConnected() ) {
Intent intent = new Intent();
intent.setAction(INTENT_ACTION_CONNECTED);
sendBroadcast(intent,null);
@ -423,6 +419,5 @@ public class OPTelemetryService extends Service {
}
if (DEBUG) Log.d(TAG, "UAVTalk stream disconnected");
}
};
};*/
}

View File

@ -105,7 +105,7 @@ public abstract class TelemetryTask implements Runnable {
// Create the required telemetry objects attached to this
// data stream
uavTalk = new UAVTalk(inStream, outStream, objMngr);
tel = new Telemetry(uavTalk, objMngr);
tel = new Telemetry(uavTalk, objMngr, Looper.myLooper());
mon = new TelemetryMonitor(objMngr,tel, telemService);
// Create an observer to notify system of connection

View File

@ -35,9 +35,17 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
import junit.framework.Assert;
import android.os.Handler;
import android.os.Looper;
import android.util.Log;
public class Telemetry {
/**
* Telemetry provides a messaging handler to handle all the object updates and transfer
* requests. This handler can either be attached to a new loop attached to the thread
* started by the telemetry service.
*/
private final String TAG = "Telemetry";
public static int LOGLEVEL = 1;
@ -98,11 +106,14 @@ public class Telemetry {
/**
* Constructor
*/
public Telemetry(UAVTalk utalkIn, UAVObjectManager objMngr)
public Telemetry(UAVTalk utalkIn, UAVObjectManager objMngr, Looper l)
{
this.utalk = utalkIn;
this.objMngr = objMngr;
// Create a handler for object messages
handler = new ObjectUpdateHandler(l);
// Process all objects in the list
List< List<UAVObject> > objs = objMngr.getObjects();
ListIterator<List<UAVObject>> li = objs.listIterator();
@ -265,48 +276,28 @@ public class Telemetry {
final Observer unpackedObserver = new Observer() {
@Override
public void update(Observable observable, Object data) {
try {
enqueueObjectUpdates((UAVObject) data, EV_UNPACKED, false, true);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
handler.unpacked((UAVObject) data);
}
};
final Observer updatedAutoObserver = new Observer() {
@Override
public void update(Observable observable, Object data) {
try {
enqueueObjectUpdates((UAVObject) data, EV_UPDATED, false, true);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
handler.updatedAuto((UAVObject) data);
}
};
final Observer updatedManualObserver = new Observer() {
@Override
public void update(Observable observable, Object data) {
try {
enqueueObjectUpdates((UAVObject) data, EV_UPDATED_MANUAL, false, true);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
handler.updatedManual((UAVObject) data);
}
};
final Observer updatedRequestedObserver = new Observer() {
@Override
public void update(Observable observable, Object data) {
try {
enqueueObjectUpdates((UAVObject) data, EV_UPDATE_REQ, false, true);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
handler.updateRequested((UAVObject) data);
}
};
@ -488,59 +479,6 @@ public class Telemetry {
}
}
/**
* Enqueue the event received from an object. This is the main method that handles all the callbacks
* from UAVObjects (due to updates, or update requests)
*/
private void enqueueObjectUpdates(UAVObject obj, int event, boolean allInstances, boolean priority) throws IOException
{
// Push event into queue
if (DEBUG) Log.d(TAG, "Push event into queue for obj " + obj.getName() + " event " + event);
if(event == 8 && obj.getName().compareTo("GCSTelemetryStats") == 0)
Thread.dumpStack();
ObjectQueueInfo objInfo = new ObjectQueueInfo();
objInfo.obj = obj;
objInfo.event = event;
objInfo.allInstances = allInstances;
if (priority)
{
// Only enqueue if an identical transaction does not already exist
if(!objPriorityQueue.contains(objInfo)) {
if ( objPriorityQueue.size() < MAX_QUEUE_SIZE )
{
objPriorityQueue.add(objInfo);
}
else
{
++txErrors;
obj.transactionCompleted(false);
Log.w(TAG,"Telemetry: priority event queue is full, event lost " + obj.getName());
}
}
}
else
{
// Only enqueue if an identical transaction does not already exist
if(!objQueue.contains(objInfo)) {
if ( objQueue.size() < MAX_QUEUE_SIZE )
{
objQueue.add(objInfo);
}
else
{
++txErrors;
obj.transactionCompleted(false);
}
}
}
// If there is no transaction in progress then process event
if (!transPending)
{
processObjectQueue();
}
}
/**
* Process events from the object queue
* @throws IOException
@ -666,7 +604,8 @@ public class Telemetry {
objinfo.timeToNextUpdateMs = objinfo.updatePeriodMs - offset;
// Send object
startTime = System.currentTimeMillis();
enqueueObjectUpdates(objinfo.obj, EV_UPDATED_MANUAL, true, false);
handler.updatedManual(objinfo.obj);
//enqueueObjectUpdates(objinfo.obj, EV_UPDATED_MANUAL, true, false);
elapsedMs = (int) (System.currentTimeMillis() - startTime);
// Update timeToNextUpdateMs with the elapsed delay of sending the object;
timeToNextUpdateMs += elapsedMs;
@ -780,6 +719,147 @@ public class Telemetry {
private static final int MIN_UPDATE_PERIOD_MS = 1;
private static final int MAX_QUEUE_SIZE = 20;
private final ObjectUpdateHandler handler;
public class ObjectUpdateHandler extends Handler {
//! This can only be created while attaching to a particular looper
ObjectUpdateHandler(Looper l) {
super(l);
}
//! Generic enqueue
void enqueueObjectUpdates(UAVObject obj, int event, boolean allInstances, boolean priority) {
if (DEBUG) Log.d(TAG, "Enqueing update " + obj.getName() + " event " + event);
ObjectQueueInfo objInfo = new ObjectQueueInfo();
objInfo.obj = obj;
objInfo.event = event;
objInfo.allInstances = allInstances;
post(new ObjectRunnable(objInfo));
}
//! Enqueue an unpacked event
void unpacked(UAVObject obj) {
enqueueObjectUpdates(obj, EV_UNPACKED, false, true);
}
//! Enqueue an updated auto event
void updatedAuto(UAVObject obj) {
enqueueObjectUpdates(obj,EV_UPDATED, false, true);
}
//! Enqueue an updated manual event
void updatedManual(UAVObject obj) {
enqueueObjectUpdates(obj, EV_UPDATE_REQ, false, true);
}
//! Enqueue an update requested event
void updateRequested(UAVObject obj) {
enqueueObjectUpdates(obj, EV_UPDATE_REQ, false, true);
}
}
class ObjectRunnable implements Runnable {
//! Transaction information to perform
private final ObjectQueueInfo objInfo;
// private final ObjectTransactionInfo transInfo = new ObjectTransactionInfo();
ObjectRunnable(ObjectQueueInfo info) {
Assert.assertNotNull(info);
objInfo = info;
}
//! Perform the transaction on the looper thread
@Override
public void run () {
Log.d(TAG,"object transaction running");
// 1. Check GCS is connected, throw this out if not
// 2. Set up a transaction which includes multiple retries, whether to wait for ack etc
// 3. Send UAVTalk message
// 4. Based on transaction type either wait for update or end
// 1. Check if a connection has been established, only process GCSTelemetryStats updates
// (used to establish the connection)
gcsStatsObj = objMngr.getObject("GCSTelemetryStats");
if ( ((String) gcsStatsObj.getField("Status").getValue()).compareTo("Connected") != 0 )
{
if ( objInfo.obj.getObjID() != objMngr.getObject("GCSTelemetryStats").getObjID() )
{
if (DEBUG) Log.d(TAG,"transactionCompleted(false) due to receiving object not GCSTelemetryStats while not connected.");
objInfo.obj.transactionCompleted(false);
return;
}
}
Log.e(TAG, "A");
// 2. Setup transaction (skip if unpack event)
if ( objInfo.event != EV_UNPACKED )
{
Log.e(TAG, "A1");
UAVObject.Metadata metadata = objInfo.obj.getMetadata();
transInfo.obj = objInfo.obj;
transInfo.allInstances = objInfo.allInstances;
transInfo.retriesRemaining = MAX_RETRIES;
transInfo.acked = metadata.GetGcsTelemetryAcked();
if ( objInfo.event == EV_UPDATED || objInfo.event == EV_UPDATED_MANUAL )
{
transInfo.objRequest = false;
}
else if ( objInfo.event == EV_UPDATE_REQ )
{
transInfo.objRequest = true;
}
// Start transaction
transPending = true;
}
Log.e(TAG, "B");
// If this is a metaobject then make necessary telemetry updates (this is why we catch unpack)
if (objInfo.obj.isMetadata())
{
UAVMetaObject metaobj = (UAVMetaObject) objInfo.obj;
updateObject( metaobj.getParentObject() );
}
Log.e(TAG, "C");
// 3. Execute transaction
if (transPending)
{
Log.e(TAG, "D");
try {
if (DEBUG || true) Log.d(TAG, "Process Object transaction for " + transInfo.obj.getName());
// Initiate transaction
if (transInfo.objRequest)
{
utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances);
}
else
{
Log.d(TAG, "Sending object");
utalk.sendObject(transInfo.obj, transInfo.acked, transInfo.allInstances);
}
// TODO: Block if request expected (??)
if ( transInfo.objRequest || transInfo.acked )
{
transTimerSetPeriod(REQ_TIMEOUT_MS);
}
else
{
synchronized(transTimer) {
transTimer.cancel();
transPending = false;
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
Log.e(TAG, "E");
e.printStackTrace();
}
}
}
}
}

View File

@ -1,6 +1,6 @@
package org.openpilot.uavtalk;
import static org.junit.Assert.*;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.InetAddress;
@ -8,11 +8,12 @@ import java.net.Socket;
import org.junit.Test;
import org.openpilot.uavtalk.uavobjects.UAVObjectsInitialize;
import org.openpilot.uavtalk.UAVTalk;
import android.os.Looper;
public class TelemetryMonitorTest {
static UAVObjectManager objMngr;
static UAVTalk talk;
static final String IP_ADDRDESS = new String("127.0.0.1");
@ -32,7 +33,7 @@ public class TelemetryMonitorTest {
e.printStackTrace();
fail("Couldn't connect to test platform");
}
try {
talk = new UAVTalk(connection.getInputStream(), connection.getOutputStream(), objMngr);
} catch (IOException e) {
@ -40,16 +41,17 @@ public class TelemetryMonitorTest {
e.printStackTrace();
fail("Couldn't construct UAVTalk object");
}
Thread inputStream = talk.getInputProcessThread();
inputStream.start();
Telemetry tel = new Telemetry(talk, objMngr);
Looper.prepare();
Telemetry tel = new Telemetry(talk, objMngr, Looper.myLooper());
@SuppressWarnings("unused")
TelemetryMonitor mon = new TelemetryMonitor(objMngr,tel);
Thread.sleep(10000);
System.out.println("Done");
}