mirror of
https://bitbucket.org/librepilot/librepilot.git
synced 2025-01-17 02:52:12 +01:00
Made a lot of critical functions synchronized to block race conditions
(essentialy implements a mutex locker for that object). Also added callbacks to UAVObjects for unpacked and updated. More to come. Finally test case that checks that we get FlightStatus through UAVTalk (i.e. that the aircraft is talking).
This commit is contained in:
parent
c7f57dffe3
commit
97b5f758a0
@ -3,9 +3,50 @@ package org.openpilot.uavtalk;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
import java.util.Observer;
|
||||
import java.util.Observable;
|
||||
|
||||
public abstract class UAVObject {
|
||||
|
||||
public class CallbackListener extends Observable {
|
||||
private UAVObject parent;
|
||||
|
||||
public CallbackListener(UAVObject parent) {
|
||||
this.parent = parent;
|
||||
}
|
||||
|
||||
public void event () {
|
||||
setChanged();
|
||||
notifyObservers(parent);
|
||||
}
|
||||
}
|
||||
|
||||
private CallbackListener updatedListeners = new CallbackListener(this);
|
||||
public void addUpdatedObserver(Observer o) {
|
||||
synchronized(updatedListeners) {
|
||||
updatedListeners.addObserver(o);
|
||||
}
|
||||
}
|
||||
void updated() {
|
||||
synchronized(updatedListeners) {
|
||||
updatedListeners.event();
|
||||
}
|
||||
}
|
||||
|
||||
private CallbackListener unpackedListeners = new CallbackListener(this);
|
||||
public void addUnpackedObserver(Observer o) {
|
||||
synchronized(unpackedListeners) {
|
||||
unpackedListeners.addObserver(o);
|
||||
}
|
||||
}
|
||||
void unpacked() {
|
||||
synchronized(unpackedListeners) {
|
||||
System.out.println("Unpacked!: " + unpackedListeners.countObservers() + " " + getName());
|
||||
unpackedListeners.event();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Object update mode
|
||||
*/
|
||||
@ -298,10 +339,12 @@ public abstract class UAVObject {
|
||||
UAVObjectField field = li.next();
|
||||
numBytes += field.unpack(dataIn);
|
||||
}
|
||||
|
||||
// Trigger all the listeners for the unpack event
|
||||
unpacked();
|
||||
updated();
|
||||
|
||||
return numBytes;
|
||||
// TODO: Callbacks
|
||||
// emit objectUnpacked(this); // trigger object updated event
|
||||
// emit objectUpdated(this);
|
||||
}
|
||||
|
||||
// /**
|
||||
|
@ -91,8 +91,7 @@ public class UAVObjectField {
|
||||
* @param dataOut
|
||||
* @return the number of bytes added
|
||||
**/
|
||||
public int pack(ByteBuffer dataOut) {
|
||||
//QMutexLocker locker(obj->getMutex());
|
||||
public synchronized int pack(ByteBuffer dataOut) {
|
||||
// Pack each element in output buffer
|
||||
dataOut.order(ByteOrder.LITTLE_ENDIAN);
|
||||
switch (type)
|
||||
@ -153,7 +152,7 @@ public class UAVObjectField {
|
||||
return getNumBytes();
|
||||
}
|
||||
|
||||
public int unpack(ByteBuffer dataIn) {
|
||||
public synchronized int unpack(ByteBuffer dataIn) {
|
||||
// Unpack each element from input buffer
|
||||
dataIn.order(ByteOrder.LITTLE_ENDIAN);
|
||||
switch (type)
|
||||
@ -240,10 +239,9 @@ public class UAVObjectField {
|
||||
return getNumBytes();
|
||||
}
|
||||
|
||||
Object getValue() { return getValue(0); };
|
||||
public Object getValue() { return getValue(0); };
|
||||
@SuppressWarnings("unchecked")
|
||||
Object getValue(int index) {
|
||||
// QMutexLocker locker(obj->getMutex());
|
||||
public synchronized Object getValue(int index) {
|
||||
// Check that index is not out of bounds
|
||||
if ( index >= numElements )
|
||||
{
|
||||
@ -287,8 +285,7 @@ public class UAVObjectField {
|
||||
|
||||
public void setValue(Object data) { setValue(data,0); }
|
||||
@SuppressWarnings("unchecked")
|
||||
public void setValue(Object data, int index) {
|
||||
// QMutexLocker locker(obj->getMutex());
|
||||
public synchronized void setValue(Object data, int index) {
|
||||
// Check that index is not out of bounds
|
||||
//if ( index >= numElements );
|
||||
//throw new Exception("Index out of bounds");
|
||||
@ -361,6 +358,7 @@ public class UAVObjectField {
|
||||
//throw new Exception("Sorry I haven't implemented strings yet");
|
||||
}
|
||||
}
|
||||
obj.updated();
|
||||
}
|
||||
}
|
||||
|
||||
@ -449,7 +447,7 @@ public class UAVObjectField {
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void clear() {
|
||||
public synchronized void clear() {
|
||||
switch (type)
|
||||
{
|
||||
case INT8:
|
||||
@ -503,7 +501,7 @@ public class UAVObjectField {
|
||||
}
|
||||
}
|
||||
|
||||
public void constructorInitialize(String name, String units, FieldType type, List<String> elementNames, List<String> options) {
|
||||
public synchronized void constructorInitialize(String name, String units, FieldType type, List<String> elementNames, List<String> options) {
|
||||
// Copy params
|
||||
this.name = name;
|
||||
this.units = units;
|
||||
|
@ -23,7 +23,7 @@ public class UAVObjectManager {
|
||||
* updates.
|
||||
* @throws Exception
|
||||
*/
|
||||
public boolean registerObject(UAVDataObject obj) throws Exception
|
||||
public synchronized boolean registerObject(UAVDataObject obj) throws Exception
|
||||
{
|
||||
// QMutexLocker locker(mutex);
|
||||
|
||||
@ -128,7 +128,7 @@ public class UAVObjectManager {
|
||||
return true;
|
||||
}
|
||||
|
||||
public void addObject(UAVObject obj)
|
||||
public synchronized void addObject(UAVObject obj)
|
||||
{
|
||||
// Add to list
|
||||
List<UAVObject> ls = new ArrayList<UAVObject>();
|
||||
@ -143,15 +143,15 @@ public class UAVObjectManager {
|
||||
*/
|
||||
public List<List<UAVObject>> getObjects()
|
||||
{
|
||||
//QMutexLocker locker(mutex);
|
||||
return objects;
|
||||
}
|
||||
|
||||
/**
|
||||
* Same as getObjects() but will only return DataObjects.
|
||||
*/
|
||||
public List< List<UAVDataObject> > getDataObjects()
|
||||
public List< List<UAVDataObject> > getDataObjects()
|
||||
{
|
||||
assert(false); // TOOD This
|
||||
return new ArrayList<List<UAVDataObject>>();
|
||||
|
||||
/* QMutexLocker locker(mutex);
|
||||
@ -190,6 +190,7 @@ public class UAVObjectManager {
|
||||
*/
|
||||
public List <List<UAVMetaObject> > getMetaObjects()
|
||||
{
|
||||
assert(false); // TODO
|
||||
return new ArrayList< List<UAVMetaObject> >();
|
||||
/*
|
||||
QMutexLocker locker(mutex);
|
||||
@ -267,7 +268,7 @@ public class UAVObjectManager {
|
||||
/**
|
||||
* Helper function for the public getObject() functions.
|
||||
*/
|
||||
public UAVObject getObject(String name, int objId, int instId)
|
||||
public synchronized UAVObject getObject(String name, int objId, int instId)
|
||||
{
|
||||
//QMutexLocker locker(mutex);
|
||||
// Check if this object type is already in the list
|
||||
@ -310,9 +311,8 @@ public class UAVObjectManager {
|
||||
/**
|
||||
* Helper function for the public getObjectInstances()
|
||||
*/
|
||||
public List<UAVObject> getObjectInstances(String name, int objId)
|
||||
public synchronized List<UAVObject> getObjectInstances(String name, int objId)
|
||||
{
|
||||
//QMutexLocker locker(mutex);
|
||||
// Check if this object type is already in the list
|
||||
ListIterator<List<UAVObject>> objIter = objects.listIterator();
|
||||
while(objIter.hasNext()) {
|
||||
|
@ -10,6 +10,22 @@ import java.nio.ByteOrder;
|
||||
|
||||
public class UAVTalk {
|
||||
|
||||
private Thread inputProcessingThread = null;
|
||||
/**
|
||||
* A reference to the thread for processing the incoming stream
|
||||
* @return
|
||||
*/
|
||||
public Thread getInputProcessThread() {
|
||||
if(inputProcessingThread == null)
|
||||
|
||||
inputProcessingThread = new Thread() {
|
||||
public void run() {
|
||||
processInputStream();
|
||||
}
|
||||
};
|
||||
return inputProcessingThread;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constants
|
||||
*/
|
||||
@ -684,7 +700,7 @@ public class UAVTalk {
|
||||
* \param[in] type Transaction type
|
||||
* \return Success (true), Failure (false)
|
||||
*/
|
||||
public boolean transmitSingleObject(UAVObject obj, int type, boolean allInstances)
|
||||
public synchronized boolean transmitSingleObject(UAVObject obj, int type, boolean allInstances)
|
||||
{
|
||||
int length;
|
||||
int dataOffset;
|
||||
|
@ -9,6 +9,8 @@ import java.net.DatagramSocket;
|
||||
import java.net.InetAddress;
|
||||
import java.net.Socket;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Observable;
|
||||
import java.util.Observer;
|
||||
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
@ -20,8 +22,9 @@ public class TalkTest {
|
||||
|
||||
static UAVObjectManager objMngr;
|
||||
static final String IP_ADDRDESS = new String("127.0.0.1");
|
||||
static final int PORT_NUM = 8000;
|
||||
|
||||
static final int PORT_NUM = 7777;
|
||||
boolean succeed = false;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
objMngr = new UAVObjectManager();
|
||||
@ -29,7 +32,7 @@ public class TalkTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProcessInputStream() {
|
||||
public void testGetFlightStatus() {
|
||||
Socket connection = null;
|
||||
UAVTalk talk = null;
|
||||
try{
|
||||
@ -48,7 +51,29 @@ public class TalkTest {
|
||||
fail("Couldn't construct UAVTalk object");
|
||||
}
|
||||
|
||||
talk.processInputStream();
|
||||
Thread inputStream = talk.getInputProcessThread();
|
||||
inputStream.start();
|
||||
|
||||
succeed = false;
|
||||
|
||||
UAVObject obj = objMngr.getObject("FlightTelemetryStats");
|
||||
|
||||
obj.addUpdatedObserver( new Observer() {
|
||||
public void update(Observable observable, Object data) {
|
||||
// TODO Auto-generated method stub
|
||||
System.out.println("Updated: " + data.toString());
|
||||
succeed = true;
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e1) {
|
||||
}
|
||||
|
||||
if(!succeed)
|
||||
fail("Never received a FlightTelemetryStats update");
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
Loading…
x
Reference in New Issue
Block a user