1
0
mirror of https://bitbucket.org/librepilot/librepilot.git synced 2024-11-30 08:24:11 +01:00

Made a lot of critical functions synchronized to block race conditions

(essentialy implements a mutex locker for that object).  Also added callbacks
to UAVObjects for unpacked and updated.  More to come.  Finally test case that
checks that we get FlightStatus through UAVTalk (i.e. that the aircraft is
talking).
This commit is contained in:
James Cotton 2011-03-07 04:54:43 -06:00
parent c7961b9f38
commit 7d13f4869d
5 changed files with 107 additions and 25 deletions

View File

@ -3,9 +3,50 @@ package org.openpilot.uavtalk;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.ListIterator;
import java.util.Observer;
import java.util.Observable;
public abstract class UAVObject {
public class CallbackListener extends Observable {
private UAVObject parent;
public CallbackListener(UAVObject parent) {
this.parent = parent;
}
public void event () {
setChanged();
notifyObservers(parent);
}
}
private CallbackListener updatedListeners = new CallbackListener(this);
public void addUpdatedObserver(Observer o) {
synchronized(updatedListeners) {
updatedListeners.addObserver(o);
}
}
void updated() {
synchronized(updatedListeners) {
updatedListeners.event();
}
}
private CallbackListener unpackedListeners = new CallbackListener(this);
public void addUnpackedObserver(Observer o) {
synchronized(unpackedListeners) {
unpackedListeners.addObserver(o);
}
}
void unpacked() {
synchronized(unpackedListeners) {
System.out.println("Unpacked!: " + unpackedListeners.countObservers() + " " + getName());
unpackedListeners.event();
}
}
/**
* Object update mode
*/
@ -298,10 +339,12 @@ public abstract class UAVObject {
UAVObjectField field = li.next();
numBytes += field.unpack(dataIn);
}
// Trigger all the listeners for the unpack event
unpacked();
updated();
return numBytes;
// TODO: Callbacks
// emit objectUnpacked(this); // trigger object updated event
// emit objectUpdated(this);
}
// /**

View File

@ -91,8 +91,7 @@ public class UAVObjectField {
* @param dataOut
* @return the number of bytes added
**/
public int pack(ByteBuffer dataOut) {
//QMutexLocker locker(obj->getMutex());
public synchronized int pack(ByteBuffer dataOut) {
// Pack each element in output buffer
dataOut.order(ByteOrder.LITTLE_ENDIAN);
switch (type)
@ -153,7 +152,7 @@ public class UAVObjectField {
return getNumBytes();
}
public int unpack(ByteBuffer dataIn) {
public synchronized int unpack(ByteBuffer dataIn) {
// Unpack each element from input buffer
dataIn.order(ByteOrder.LITTLE_ENDIAN);
switch (type)
@ -240,10 +239,9 @@ public class UAVObjectField {
return getNumBytes();
}
Object getValue() { return getValue(0); };
public Object getValue() { return getValue(0); };
@SuppressWarnings("unchecked")
Object getValue(int index) {
// QMutexLocker locker(obj->getMutex());
public synchronized Object getValue(int index) {
// Check that index is not out of bounds
if ( index >= numElements )
{
@ -287,8 +285,7 @@ public class UAVObjectField {
public void setValue(Object data) { setValue(data,0); }
@SuppressWarnings("unchecked")
public void setValue(Object data, int index) {
// QMutexLocker locker(obj->getMutex());
public synchronized void setValue(Object data, int index) {
// Check that index is not out of bounds
//if ( index >= numElements );
//throw new Exception("Index out of bounds");
@ -361,6 +358,7 @@ public class UAVObjectField {
//throw new Exception("Sorry I haven't implemented strings yet");
}
}
obj.updated();
}
}
@ -449,7 +447,7 @@ public class UAVObjectField {
}
@SuppressWarnings("unchecked")
public void clear() {
public synchronized void clear() {
switch (type)
{
case INT8:
@ -503,7 +501,7 @@ public class UAVObjectField {
}
}
public void constructorInitialize(String name, String units, FieldType type, List<String> elementNames, List<String> options) {
public synchronized void constructorInitialize(String name, String units, FieldType type, List<String> elementNames, List<String> options) {
// Copy params
this.name = name;
this.units = units;

View File

@ -23,7 +23,7 @@ public class UAVObjectManager {
* updates.
* @throws Exception
*/
public boolean registerObject(UAVDataObject obj) throws Exception
public synchronized boolean registerObject(UAVDataObject obj) throws Exception
{
// QMutexLocker locker(mutex);
@ -128,7 +128,7 @@ public class UAVObjectManager {
return true;
}
public void addObject(UAVObject obj)
public synchronized void addObject(UAVObject obj)
{
// Add to list
List<UAVObject> ls = new ArrayList<UAVObject>();
@ -143,15 +143,15 @@ public class UAVObjectManager {
*/
public List<List<UAVObject>> getObjects()
{
//QMutexLocker locker(mutex);
return objects;
}
/**
* Same as getObjects() but will only return DataObjects.
*/
public List< List<UAVDataObject> > getDataObjects()
public List< List<UAVDataObject> > getDataObjects()
{
assert(false); // TOOD This
return new ArrayList<List<UAVDataObject>>();
/* QMutexLocker locker(mutex);
@ -190,6 +190,7 @@ public class UAVObjectManager {
*/
public List <List<UAVMetaObject> > getMetaObjects()
{
assert(false); // TODO
return new ArrayList< List<UAVMetaObject> >();
/*
QMutexLocker locker(mutex);
@ -267,7 +268,7 @@ public class UAVObjectManager {
/**
* Helper function for the public getObject() functions.
*/
public UAVObject getObject(String name, int objId, int instId)
public synchronized UAVObject getObject(String name, int objId, int instId)
{
//QMutexLocker locker(mutex);
// Check if this object type is already in the list
@ -310,9 +311,8 @@ public class UAVObjectManager {
/**
* Helper function for the public getObjectInstances()
*/
public List<UAVObject> getObjectInstances(String name, int objId)
public synchronized List<UAVObject> getObjectInstances(String name, int objId)
{
//QMutexLocker locker(mutex);
// Check if this object type is already in the list
ListIterator<List<UAVObject>> objIter = objects.listIterator();
while(objIter.hasNext()) {

View File

@ -10,6 +10,22 @@ import java.nio.ByteOrder;
public class UAVTalk {
private Thread inputProcessingThread = null;
/**
* A reference to the thread for processing the incoming stream
* @return
*/
public Thread getInputProcessThread() {
if(inputProcessingThread == null)
inputProcessingThread = new Thread() {
public void run() {
processInputStream();
}
};
return inputProcessingThread;
}
/**
* Constants
*/
@ -684,7 +700,7 @@ public class UAVTalk {
* \param[in] type Transaction type
* \return Success (true), Failure (false)
*/
public boolean transmitSingleObject(UAVObject obj, int type, boolean allInstances)
public synchronized boolean transmitSingleObject(UAVObject obj, int type, boolean allInstances)
{
int length;
int dataOffset;

View File

@ -9,6 +9,8 @@ import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Observable;
import java.util.Observer;
import org.junit.BeforeClass;
import org.junit.Test;
@ -20,8 +22,9 @@ public class TalkTest {
static UAVObjectManager objMngr;
static final String IP_ADDRDESS = new String("127.0.0.1");
static final int PORT_NUM = 8000;
static final int PORT_NUM = 7777;
boolean succeed = false;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
objMngr = new UAVObjectManager();
@ -29,7 +32,7 @@ public class TalkTest {
}
@Test
public void testProcessInputStream() {
public void testGetFlightStatus() {
Socket connection = null;
UAVTalk talk = null;
try{
@ -48,7 +51,29 @@ public class TalkTest {
fail("Couldn't construct UAVTalk object");
}
talk.processInputStream();
Thread inputStream = talk.getInputProcessThread();
inputStream.start();
succeed = false;
UAVObject obj = objMngr.getObject("FlightTelemetryStats");
obj.addUpdatedObserver( new Observer() {
public void update(Observable observable, Object data) {
// TODO Auto-generated method stub
System.out.println("Updated: " + data.toString());
succeed = true;
}
});
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
}
if(!succeed)
fail("Never received a FlightTelemetryStats update");
}
@Test