1
0
mirror of https://bitbucket.org/librepilot/librepilot.git synced 2025-02-19 09:54:15 +01:00

AndroidGCS UAVTalk: Pretty thorough clean up focused on how transactions are

handled at the UAVTalk level where there can only be one pending transaction on
the wire.
This commit is contained in:
James Cotton 2012-08-10 14:18:41 -05:00
parent 6b9b49734e
commit 6ebf4fe87c
3 changed files with 297 additions and 221 deletions

View File

@ -118,7 +118,7 @@ public class Telemetry {
@Override
void TransactionSucceeded(UAVObject data) {
try {
transactionCompleted(data);
transactionCompleted(data, true);
} catch (IOException e) {
// Disconnect when stream fails
utalk.setOnTransactionCompletedListener(null);
@ -127,8 +127,8 @@ public class Telemetry {
@Override
void TransactionFailed(UAVObject data) {
try {
Log.d(TAG, "TransactionFailed(" + data.getName() + ")");
transactionCompleted(data);
if (DEBUG) Log.d(TAG, "TransactionFailed(" + data.getName() + ")");
transactionCompleted(data, false);
} catch (IOException e) {
// Disconnect when stream fails
utalk.setOnTransactionCompletedListener(null);
@ -376,7 +376,7 @@ public class Telemetry {
* Called when a transaction is successfully completed (uavtalk event)
* @throws IOException
*/
private synchronized void transactionCompleted(UAVObject obj) throws IOException
private synchronized void transactionCompleted(UAVObject obj, boolean result) throws IOException
{
if (DEBUG) Log.d(TAG,"UAVTalk transactionCompleted");
// Check if there is a pending transaction and the objects match
@ -386,8 +386,9 @@ public class Telemetry {
// Complete transaction
transTimer.cancel();
transPending = false;
// Send signal
obj.transactionCompleted(true);
//Send signal
obj.transactionCompleted(result);
// Process new object updates from queue
processObjectQueue();
} else
@ -416,13 +417,9 @@ public class Telemetry {
}
else
{
// Terminate transaction
utalk.cancelTransaction();
transPending = false;
// Send signal
transInfo.obj.transactionCompleted(false);
// Process new object updates from queue
processObjectQueue();
// Terminate transaction. This triggers UAVTalk to send a transaction
// failed signal which will make the next queue entry be processed
//utalk.cancelPendingTransaction();
++txErrors;
}
}

View File

@ -39,9 +39,10 @@ import android.util.Log;
public class TelemetryMonitor extends Observable {
private static final String TAG = "TelemetryMonitor";
public static int LOGLEVEL = 0;
public static boolean WARN = LOGLEVEL > 1;
public static boolean DEBUG = LOGLEVEL > 0;
public static final int LOGLEVEL = 0;
public static boolean DEBUG = LOGLEVEL > 2;
public static final boolean WARN = LOGLEVEL > 1;
public static final boolean ERROR = LOGLEVEL > 0;
static final int STATS_UPDATE_PERIOD_MS = 4000;
static final int STATS_CONNECT_PERIOD_MS = 1000;
@ -208,7 +209,7 @@ public class TelemetryMonitor extends Observable {
if (!success) {
// Right now success = false means received a NAK so don't
// re-attempt
Log.e(TAG, "Transaction failed.");
if (ERROR) Log.e(TAG, "Transaction failed.");
}
// Process next object if telemetry is still available

View File

@ -106,9 +106,13 @@ public class UAVTalk {
static final int TYPE_MASK = 0xF8;
static final int TYPE_VER = 0x20;
//! Packet contains an object
static final int TYPE_OBJ = (TYPE_VER | 0x00);
//! Packet is a request for an object
static final int TYPE_OBJ_REQ = (TYPE_VER | 0x01);
//! Packet is an object with a request for an ack
static final int TYPE_OBJ_ACK = (TYPE_VER | 0x02);
//! Packet is an ack for an object
static final int TYPE_ACK = (TYPE_VER | 0x03);
static final int TYPE_NACK = (TYPE_VER | 0x04);
@ -135,8 +139,14 @@ public class UAVTalk {
OutputStream outStream;
UAVObjectManager objMngr;
//! Currently only one UAVTalk transaction is permitted at a time. If this is null none are in process
//! otherwise points to the pending object
UAVObject respObj;
//! If the pending transaction is for all the instances
boolean respAllInstances;
//! The type of response we are expecting
int respType;
// Variables used by the receive state machine
ByteBuffer rxTmpBuffer /* 4 */;
ByteBuffer rxBuffer;
@ -234,7 +244,6 @@ public class UAVTalk {
* @throws IOException
*/
public boolean sendObjectRequest(UAVObject obj, boolean allInstances) throws IOException {
// QMutexLocker locker(mutex);
return objectTransaction(obj, TYPE_OBJ_REQ, allInstances);
}
@ -245,8 +254,7 @@ public class UAVTalk {
* Success (true), Failure (false)
* @throws IOException
*/
public synchronized boolean sendObject(UAVObject obj, boolean acked,
boolean allInstances) throws IOException {
public boolean sendObject(UAVObject obj, boolean acked, boolean allInstances) throws IOException {
if (acked) {
return objectTransaction(obj, TYPE_OBJ_ACK, allInstances);
} else {
@ -255,10 +263,46 @@ public class UAVTalk {
}
/**
* Cancel a pending transaction
* UAVTalk takes care of it's own transactions but if the caller knows
* it wants to give up on one (after a timeout) then it can cancel it
* @return True if that object was pending, False otherwise
*/
public synchronized void cancelTransaction() {
public synchronized boolean cancelPendingTransaction(UAVObject obj) {
if(respObj != null && respObj.getObjID() == obj.getObjID()) {
if(transactionListener != null) {
Log.d(TAG,"Canceling transaction: " + respObj.getName());
transactionListener.TransactionFailed(respObj);
}
respObj = null;
return true;
} else
return false;
}
/**
* Cancel a pending transaction. If there is a pending transaction and
* a listener then notify them that the transaction failed.
*/
/*private synchronized void cancelPendingTransaction() {
if(respObj != null && transactionListener != null) {
Log.d(TAG,"Canceling transaction: " + respObj.getName());
transactionListener.TransactionFailed(respObj);
}
respObj = null;
}*/
/**
* This is the code that sets up a new UAVTalk packet that expects a response.
*/
private synchronized void setupTransaction(UAVObject obj, boolean allInstances, int type) {
// Only cancel if it is for a different object
if(respObj != null && respObj.getObjID() != obj.getObjID())
cancelPendingTransaction(obj);
respObj = obj;
respAllInstances = allInstances;
respType = type;
}
/**
@ -269,21 +313,9 @@ public class UAVTalk {
* Success (true), Failure (false)
* @throws IOException
*/
public boolean objectTransaction(UAVObject obj, int type,
boolean allInstances) throws IOException {
// Send object depending on if a response is needed
if (type == TYPE_OBJ_ACK || type == TYPE_OBJ_REQ) {
if (transmitObject(obj, type, allInstances)) {
if(type == TYPE_OBJ_REQ)
if (ERROR) Log.e(TAG, "Sending obj req");
respObj = obj;
respAllInstances = allInstances;
return true;
} else {
return false;
}
} else if (type == TYPE_OBJ) {
return transmitObject(obj, TYPE_OBJ, allInstances);
private synchronized boolean objectTransaction(UAVObject obj, int type, boolean allInstances) throws IOException {
if (type == TYPE_OBJ_ACK || type == TYPE_OBJ_REQ || type == TYPE_OBJ) {
return transmitObject(obj, type, allInstances);
} else {
return false;
}
@ -294,207 +326,210 @@ public class UAVTalk {
* byte \return Success (true), Failure (false)
* @throws IOException
*/
public synchronized boolean processInputByte(int rxbyte) throws IOException {
public boolean processInputByte(int rxbyte) throws IOException {
Assert.assertNotNull(objMngr);
// Update stats
stats.rxBytes++;
// Only need to synchronize this method on the state machine state
synchronized(rxState) {
// Update stats
stats.rxBytes++;
rxPacketLength++; // update packet byte count
rxPacketLength++; // update packet byte count
// Receive state machine
switch (rxState) {
case STATE_SYNC:
// Receive state machine
switch (rxState) {
case STATE_SYNC:
if (rxbyte != SYNC_VAL)
if (rxbyte != SYNC_VAL)
break;
// Initialize and update CRC
rxCS = updateCRC(0, rxbyte);
rxPacketLength = 1;
rxState = RxStateType.STATE_TYPE;
break;
// Initialize and update CRC
rxCS = updateCRC(0, rxbyte);
case STATE_TYPE:
rxPacketLength = 1;
// Update CRC
rxCS = updateCRC(rxCS, rxbyte);
rxState = RxStateType.STATE_TYPE;
break;
case STATE_TYPE:
// Update CRC
rxCS = updateCRC(rxCS, rxbyte);
if ((rxbyte & TYPE_MASK) != TYPE_VER) {
Log.e(TAG, "Unknown UAVTalk type:" + rxbyte);
rxState = RxStateType.STATE_SYNC;
break;
}
rxType = rxbyte;
if (VERBOSE) Log.v(TAG, "Received packet type: " + rxType);
packetSize = 0;
rxState = RxStateType.STATE_SIZE;
rxCount = 0;
break;
case STATE_SIZE:
// Update CRC
rxCS = updateCRC(rxCS, rxbyte);
if (rxCount == 0) {
packetSize += rxbyte;
rxCount++;
break;
}
packetSize += (rxbyte << 8) & 0xff00;
if (packetSize < MIN_HEADER_LENGTH
|| packetSize > MAX_HEADER_LENGTH + MAX_PAYLOAD_LENGTH) { // incorrect
// packet
// size
rxState = RxStateType.STATE_SYNC;
break;
}
rxCount = 0;
rxState = RxStateType.STATE_OBJID;
rxTmpBuffer.position(0);
break;
case STATE_OBJID:
// Update CRC
rxCS = updateCRC(rxCS, rxbyte);
rxTmpBuffer.put(rxCount++, (byte) (rxbyte & 0xff));
if (rxCount < 4)
break;
// Search for object, if not found reset state machine
rxObjId = rxTmpBuffer.getInt(0);
// Because java treats ints as only signed we need to do this manually
if (rxObjId < 0)
rxObjId = 0x100000000l + rxObjId;
{
UAVObject rxObj = objMngr.getObject(rxObjId);
if (rxObj == null) {
if (DEBUG) Log.d(TAG, "Unknown ID: " + rxObjId);
stats.rxErrors++;
if ((rxbyte & TYPE_MASK) != TYPE_VER) {
Log.e(TAG, "Unknown UAVTalk type:" + rxbyte);
rxState = RxStateType.STATE_SYNC;
break;
}
// Determine data length
if (rxType == TYPE_OBJ_REQ || rxType == TYPE_ACK || rxType == TYPE_NACK)
rxLength = 0;
else
rxLength = rxObj.getNumBytes();
rxType = rxbyte;
if (VERBOSE) Log.v(TAG, "Received packet type: " + rxType);
packetSize = 0;
// Check length and determine next state
if (rxLength >= MAX_PAYLOAD_LENGTH) {
stats.rxErrors++;
rxState = RxStateType.STATE_SIZE;
rxCount = 0;
break;
case STATE_SIZE:
// Update CRC
rxCS = updateCRC(rxCS, rxbyte);
if (rxCount == 0) {
packetSize += rxbyte;
rxCount++;
break;
}
packetSize += (rxbyte << 8) & 0xff00;
if (packetSize < MIN_HEADER_LENGTH
|| packetSize > MAX_HEADER_LENGTH + MAX_PAYLOAD_LENGTH) { // incorrect
// packet
// size
rxState = RxStateType.STATE_SYNC;
break;
}
// Check the lengths match
if ((rxPacketLength + rxLength) != packetSize) { // packet error
// -
// mismatched
// packet
// size
stats.rxErrors++;
rxState = RxStateType.STATE_SYNC;
break;
}
rxCount = 0;
rxState = RxStateType.STATE_OBJID;
rxTmpBuffer.position(0);
break;
// Check if this is a single instance object (i.e. if the
// instance ID field is coming next)
if (rxObj.isSingleInstance()) {
// If there is a payload get it, otherwise receive checksum
if (rxLength > 0)
rxState = RxStateType.STATE_DATA;
case STATE_OBJID:
// Update CRC
rxCS = updateCRC(rxCS, rxbyte);
rxTmpBuffer.put(rxCount++, (byte) (rxbyte & 0xff));
if (rxCount < 4)
break;
// Search for object, if not found reset state machine
rxObjId = rxTmpBuffer.getInt(0);
// Because java treats ints as only signed we need to do this manually
if (rxObjId < 0)
rxObjId = 0x100000000l + rxObjId;
{
UAVObject rxObj = objMngr.getObject(rxObjId);
if (rxObj == null) {
if (DEBUG) Log.d(TAG, "Unknown ID: " + rxObjId);
stats.rxErrors++;
rxState = RxStateType.STATE_SYNC;
break;
}
// Determine data length
if (rxType == TYPE_OBJ_REQ || rxType == TYPE_ACK || rxType == TYPE_NACK)
rxLength = 0;
else
rxState = RxStateType.STATE_CS;
rxInstId = 0;
rxCount = 0;
} else {
rxState = RxStateType.STATE_INSTID;
rxCount = 0;
rxLength = rxObj.getNumBytes();
// Check length and determine next state
if (rxLength >= MAX_PAYLOAD_LENGTH) {
stats.rxErrors++;
rxState = RxStateType.STATE_SYNC;
break;
}
// Check the lengths match
if ((rxPacketLength + rxLength) != packetSize) { // packet error
// -
// mismatched
// packet
// size
stats.rxErrors++;
rxState = RxStateType.STATE_SYNC;
break;
}
// Check if this is a single instance object (i.e. if the
// instance ID field is coming next)
if (rxObj.isSingleInstance()) {
// If there is a payload get it, otherwise receive checksum
if (rxLength > 0)
rxState = RxStateType.STATE_DATA;
else
rxState = RxStateType.STATE_CS;
rxInstId = 0;
rxCount = 0;
} else {
rxState = RxStateType.STATE_INSTID;
rxCount = 0;
}
}
}
break;
case STATE_INSTID:
// Update CRC
rxCS = updateCRC(rxCS, rxbyte);
rxTmpBuffer.put(rxCount++, (byte) (rxbyte & 0xff));
if (rxCount < 2)
break;
rxInstId = rxTmpBuffer.getShort(0);
case STATE_INSTID:
rxCount = 0;
// Update CRC
rxCS = updateCRC(rxCS, rxbyte);
rxTmpBuffer.put(rxCount++, (byte) (rxbyte & 0xff));
if (rxCount < 2)
break;
rxInstId = rxTmpBuffer.getShort(0);
rxCount = 0;
// If there is a payload get it, otherwise receive checksum
if (rxLength > 0)
rxState = RxStateType.STATE_DATA;
else
rxState = RxStateType.STATE_CS;
break;
case STATE_DATA:
// Update CRC
rxCS = updateCRC(rxCS, rxbyte);
rxBuffer.put(rxCount++, (byte) (rxbyte & 0xff));
if (rxCount < rxLength)
break;
// If there is a payload get it, otherwise receive checksum
if (rxLength > 0)
rxState = RxStateType.STATE_DATA;
else
rxState = RxStateType.STATE_CS;
break;
case STATE_DATA:
// Update CRC
rxCS = updateCRC(rxCS, rxbyte);
rxBuffer.put(rxCount++, (byte) (rxbyte & 0xff));
if (rxCount < rxLength)
rxCount = 0;
break;
rxState = RxStateType.STATE_CS;
rxCount = 0;
break;
case STATE_CS:
case STATE_CS:
// The CRC byte
rxCSPacket = rxbyte;
// The CRC byte
rxCSPacket = rxbyte;
if (rxCS != rxCSPacket) { // packet error - faulty CRC
if (DEBUG) Log.d(TAG,"Bad crc");
stats.rxErrors++;
rxState = RxStateType.STATE_SYNC;
break;
}
if (rxPacketLength != (packetSize + 1)) { // packet error -
// mismatched packet
// size
if (DEBUG) Log.d(TAG,"Bad size");
stats.rxErrors++;
rxState = RxStateType.STATE_SYNC;
break;
}
if (DEBUG) Log.d(TAG,"Received");
rxBuffer.position(0);
receiveObject(rxType, rxObjId, rxInstId, rxBuffer);
stats.rxObjectBytes += rxLength;
stats.rxObjects++;
if (rxCS != rxCSPacket) { // packet error - faulty CRC
if (DEBUG) Log.d(TAG,"Bad crc");
stats.rxErrors++;
rxState = RxStateType.STATE_SYNC;
break;
}
if (rxPacketLength != (packetSize + 1)) { // packet error -
// mismatched packet
// size
if (DEBUG) Log.d(TAG,"Bad size");
stats.rxErrors++;
default:
rxState = RxStateType.STATE_SYNC;
break;
stats.rxErrors++;
}
if (DEBUG) Log.d(TAG,"Received");
rxBuffer.position(0);
receiveObject(rxType, rxObjId, rxInstId, rxBuffer);
stats.rxObjectBytes += rxLength;
stats.rxObjects++;
rxState = RxStateType.STATE_SYNC;
break;
default:
rxState = RxStateType.STATE_SYNC;
stats.rxErrors++;
}
// Done
@ -510,8 +545,7 @@ public class UAVTalk {
* length \return Success (true), Failure (false)
* @throws IOException
*/
public boolean receiveObject(int type, long objId, long instId,
ByteBuffer data) throws IOException {
public boolean receiveObject(int type, long objId, long instId, ByteBuffer data) throws IOException {
if (DEBUG) Log.d(TAG, "Received object ID: " + objId);
assert (objMngr != null);
@ -526,11 +560,13 @@ public class UAVTalk {
// All instances, not allowed for OBJ messages
if (!allInstances) {
if (DEBUG) Log.d(TAG,"Received object: " + objMngr.getObject(objId).getName());
// Get object and update its data
obj = updateObject(objId, instId, data);
// Check if an ack is pending
if (obj != null) {
updateAck(obj);
// Check if this is a response to a UAVTalk transaction
updateObjReq(obj);
} else {
error = true;
}
@ -581,7 +617,7 @@ public class UAVTalk {
// Check if object exists:
if (obj != null)
{
updateNack(obj);
receivedNack(obj);
}
else
{
@ -620,11 +656,13 @@ public class UAVTalk {
// Get object
UAVObject obj = objMngr.getObject(objId, instId);
// If the instance does not exist create it
if (obj == null) {
// Get the object type
UAVObject tobj = objMngr.getObject(objId);
if (tobj == null) {
// TODO: Return a NAK since we don't know this object
return null;
}
// Make sure this is a data object
@ -661,32 +699,62 @@ public class UAVTalk {
}
/**
* Check if a transaction is pending and if yes complete it.
* Called when an object is received to check if this completes
* a UAVTalk transaction
*/
void updateNack(UAVObject obj)
{
if (DEBUG) Log.d(TAG, "NACK received: " + obj.getName());
private synchronized void updateObjReq(UAVObject obj) {
// Check if this is not a possible candidate
Assert.assertNotNull(obj);
//obj.transactionCompleted(false);
if (respObj != null && respObj.getObjID() == obj.getObjID() &&
(respObj.getInstID() == obj.getInstID() || respAllInstances)) {
if (transactionListener != null)
transactionListener.TransactionFailed(obj);
if(respObj != null && respType == TYPE_OBJ_REQ && respObj.getObjID() == obj.getObjID() &&
((respObj.getInstID() == obj.getInstID() || !respAllInstances))) {
// Indicate complete
respObj = null;
// Notify listener
if (transactionListener != null)
transactionListener.TransactionSucceeded(obj);
}
}
/**
* Check if a transaction is pending and if yes complete it.
*/
synchronized void updateAck(UAVObject obj) {
if (DEBUG) Log.d(TAG, "ACK received: " + obj.getName());
private synchronized void receivedNack(UAVObject obj)
{
Assert.assertNotNull(obj);
if(respObj != null && (respType == TYPE_OBJ_REQ || respType == TYPE_OBJ_ACK ) &&
respObj.getObjID() == obj.getObjID()) {
if (DEBUG) Log.d(TAG, "NAK: " + obj.getName());
// Indicate complete
respObj = null;
// Notify listener
if (transactionListener != null)
transactionListener.TransactionFailed(obj);
}
}
/**
* Check if a transaction is pending that this acked object corresponds to
* and if yes complete it.
*/
private synchronized void updateAck(UAVObject obj) {
if (DEBUG) Log.d(TAG, "Received ack: " + obj.getName());
Assert.assertNotNull(obj);
if (respObj != null && respObj.getObjID() == obj.getObjID()
&& (respObj.getInstID() == obj.getInstID() || respAllInstances)) {
// Indicate complete
respObj = null;
// Notify listener
if (transactionListener != null)
transactionListener.TransactionSucceeded(obj);
respObj = null;
}
}
@ -698,7 +766,7 @@ public class UAVTalk {
* @return Success (true), Failure (false)
* @throws IOException
*/
public synchronized boolean transmitObject(UAVObject obj, int type, boolean allInstances) throws IOException {
private boolean transmitObject(UAVObject obj, int type, boolean allInstances) throws IOException {
// If all instances are requested on a single instance object it is an
// error
if (allInstances && obj.isSingleInstance()) {
@ -712,6 +780,10 @@ public class UAVTalk {
int numInst = objMngr.getNumInstances(obj.getObjID());
// Send all instances
for (int instId = 0; instId < numInst; ++instId) {
// TODO: This code is buggy probably. We should send each request
// and wait for an ack in the case of an TYPE_OBJ_ACK
Assert.assertNotSame(type, TYPE_OBJ_ACK); // catch any buggy calls
UAVObject inst = objMngr.getObject(obj.getObjID(), instId);
transmitSingleObject(inst, type, false);
}
@ -738,8 +810,7 @@ public class UAVTalk {
* @param[in] obj Object handle to send
* @param[in] type Transaction type \return Success (true), Failure (false)
*/
public synchronized boolean transmitSingleObject(UAVObject obj, int type,
boolean allInstances) throws IOException {
private boolean transmitSingleObject(UAVObject obj, int type, boolean allInstances) throws IOException {
int length;
int allInstId = ALL_INSTANCES;
@ -793,6 +864,13 @@ public class UAVTalk {
bbuf.position(0);
byte[] dst = new byte[packlen];
bbuf.get(dst, 0, packlen);
if (type == TYPE_OBJ_ACK || type == TYPE_OBJ_REQ) {
// Once we send a UAVTalk packet that requires an ack or object let's set up
// the transaction here
setupTransaction(obj, allInstances, type);
}
outStream.write(dst);