mirror of
https://bitbucket.org/librepilot/librepilot.git
synced 2025-01-29 14:52:12 +01:00
AndroidGCS Telemetry: Use a runnable for the transaction timeout. Now all of
telemetry is using handlers nicely, but we still can have multiple transactions queued for the same object.
This commit is contained in:
parent
1bbfb35446
commit
7f028f6d42
@ -126,7 +126,7 @@ public class Telemetry {
|
|||||||
ListIterator<List<UAVObject>> li = objs.listIterator();
|
ListIterator<List<UAVObject>> li = objs.listIterator();
|
||||||
while (li.hasNext())
|
while (li.hasNext())
|
||||||
registerObject(li.next().get(0)); // we only need to register one
|
registerObject(li.next().get(0)); // we only need to register one
|
||||||
// instance per object type
|
// instance per object type
|
||||||
|
|
||||||
// Listen to new object creations
|
// Listen to new object creations
|
||||||
objMngr.addNewInstanceObserver(new Observer() {
|
objMngr.addNewInstanceObserver(new Observer() {
|
||||||
@ -146,25 +146,15 @@ public class Telemetry {
|
|||||||
utalk.setOnTransactionCompletedListener(utalk.new OnTransactionCompletedListener() {
|
utalk.setOnTransactionCompletedListener(utalk.new OnTransactionCompletedListener() {
|
||||||
@Override
|
@Override
|
||||||
void TransactionSucceeded(UAVObject data) {
|
void TransactionSucceeded(UAVObject data) {
|
||||||
try {
|
transactionCompleted(data, true);
|
||||||
transactionCompleted(data, true);
|
|
||||||
} catch (IOException e) {
|
|
||||||
// Disconnect when stream fails
|
|
||||||
utalk.setOnTransactionCompletedListener(null);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
void TransactionFailed(UAVObject data) {
|
void TransactionFailed(UAVObject data) {
|
||||||
try {
|
if (DEBUG)
|
||||||
if (DEBUG)
|
Log.d(TAG, "TransactionFailed(" + data.getName() + ")");
|
||||||
Log.d(TAG, "TransactionFailed(" + data.getName() + ")");
|
|
||||||
|
|
||||||
transactionCompleted(data, false);
|
transactionCompleted(data, false);
|
||||||
} catch (IOException e) {
|
|
||||||
// Disconnect when stream fails
|
|
||||||
utalk.setOnTransactionCompletedListener(null);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
});
|
});
|
||||||
@ -182,28 +172,6 @@ public class Telemetry {
|
|||||||
txRetries = 0;
|
txRetries = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void transTimerSetPeriod(int periodMs) {
|
|
||||||
if (transTimerTask != null)
|
|
||||||
transTimerTask.cancel();
|
|
||||||
|
|
||||||
if (transTimer != null)
|
|
||||||
transTimer.purge();
|
|
||||||
|
|
||||||
transTimer = new Timer();
|
|
||||||
|
|
||||||
transTimerTask = new TimerTask() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
transactionTimeout();
|
|
||||||
} catch (IOException e) {
|
|
||||||
cancel();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
transTimer.schedule(transTimerTask, periodMs, periodMs);
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized void updateTimerSetPeriod(int periodMs) {
|
synchronized void updateTimerSetPeriod(int periodMs) {
|
||||||
if (updateTimer != null) {
|
if (updateTimer != null) {
|
||||||
updateTimer.cancel();
|
updateTimer.cancel();
|
||||||
@ -354,7 +322,7 @@ public class Telemetry {
|
|||||||
eventMask = EV_UPDATED_MANUAL | EV_UPDATE_REQ;
|
eventMask = EV_UPDATED_MANUAL | EV_UPDATE_REQ;
|
||||||
if (obj.isMetadata())
|
if (obj.isMetadata())
|
||||||
eventMask |= EV_UNPACKED; // we also need to act on remote
|
eventMask |= EV_UNPACKED; // we also need to act on remote
|
||||||
// updates (unpack events)
|
// updates (unpack events)
|
||||||
|
|
||||||
connectToObjectInstances(obj, eventMask);
|
connectToObjectInstances(obj, eventMask);
|
||||||
} else if (metadata.GetGcsTelemetryUpdateMode() == UAVObject.UpdateMode.UPDATEMODE_ONCHANGE) {
|
} else if (metadata.GetGcsTelemetryUpdateMode() == UAVObject.UpdateMode.UPDATEMODE_ONCHANGE) {
|
||||||
@ -364,7 +332,7 @@ public class Telemetry {
|
|||||||
eventMask = EV_UPDATED | EV_UPDATED_MANUAL | EV_UPDATE_REQ;
|
eventMask = EV_UPDATED | EV_UPDATED_MANUAL | EV_UPDATE_REQ;
|
||||||
if (obj.isMetadata())
|
if (obj.isMetadata())
|
||||||
eventMask |= EV_UNPACKED; // we also need to act on remote
|
eventMask |= EV_UNPACKED; // we also need to act on remote
|
||||||
// updates (unpack events)
|
// updates (unpack events)
|
||||||
|
|
||||||
connectToObjectInstances(obj, eventMask);
|
connectToObjectInstances(obj, eventMask);
|
||||||
} else if (metadata.GetGcsTelemetryUpdateMode() == UAVObject.UpdateMode.UPDATEMODE_THROTTLED) {
|
} else if (metadata.GetGcsTelemetryUpdateMode() == UAVObject.UpdateMode.UPDATEMODE_THROTTLED) {
|
||||||
@ -376,106 +344,12 @@ public class Telemetry {
|
|||||||
eventMask = EV_UPDATED_MANUAL | EV_UPDATE_REQ;
|
eventMask = EV_UPDATED_MANUAL | EV_UPDATE_REQ;
|
||||||
if (obj.isMetadata())
|
if (obj.isMetadata())
|
||||||
eventMask |= EV_UNPACKED; // we also need to act on remote
|
eventMask |= EV_UNPACKED; // we also need to act on remote
|
||||||
// updates (unpack events)
|
// updates (unpack events)
|
||||||
|
|
||||||
connectToObjectInstances(obj, eventMask);
|
connectToObjectInstances(obj, eventMask);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Called when a transaction is successfully completed (uavtalk event)
|
|
||||||
*
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private void transactionCompleted(UAVObject obj, boolean result)
|
|
||||||
throws IOException {
|
|
||||||
if (DEBUG) Log.d(TAG, "UAVTalk transactionCompleted");
|
|
||||||
|
|
||||||
// Check if there is a pending transaction and the objects match
|
|
||||||
if (transPending && transInfo.obj.getObjID() == obj.getObjID()) {
|
|
||||||
if (DEBUG) Log.d(TAG, "Telemetry: transaction completed for " + obj.getName());
|
|
||||||
|
|
||||||
// Complete transaction
|
|
||||||
synchronized(transTimer) {
|
|
||||||
transTimer.cancel();
|
|
||||||
transPending = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
//Send signal
|
|
||||||
obj.transactionCompleted(result);
|
|
||||||
} else {
|
|
||||||
if (ERROR) Log.e(TAG, "Error: received a transaction completed when did not expect it.");
|
|
||||||
transPending = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Called when a transaction is not completed within the timeout period
|
|
||||||
* (timer event)
|
|
||||||
*
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private void transactionTimeout() throws IOException {
|
|
||||||
if (DEBUG)
|
|
||||||
Log.d(TAG, "Telemetry: transaction timeout.");
|
|
||||||
synchronized (transTimer) {
|
|
||||||
transTimer.cancel();
|
|
||||||
// Proceed only if there is a pending transaction
|
|
||||||
if (transPending) {
|
|
||||||
// Check if more retries are pending
|
|
||||||
if (transInfo.retriesRemaining > 0) {
|
|
||||||
--transInfo.retriesRemaining;
|
|
||||||
processObjectTransaction();
|
|
||||||
++txRetries;
|
|
||||||
} else {
|
|
||||||
if (ERROR)
|
|
||||||
Log.e(TAG, "Transaction failed for: " + transInfo.obj.getName());
|
|
||||||
|
|
||||||
// Terminate transaction. This triggers UAVTalk to send a transaction
|
|
||||||
// failed signal which will make the next queue entry be processed
|
|
||||||
// Note this is UAVTalk listener TransactionFailed function
|
|
||||||
// object specific transaction failed.
|
|
||||||
utalk.cancelPendingTransaction(transInfo.obj);
|
|
||||||
++txErrors;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Start an object transaction with UAVTalk, all information is stored in
|
|
||||||
* transInfo
|
|
||||||
*
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private void processObjectTransaction() throws IOException {
|
|
||||||
if (transPending) {
|
|
||||||
if (DEBUG)
|
|
||||||
Log.d(TAG, "Process Object transaction for " + transInfo.obj.getName());
|
|
||||||
|
|
||||||
// Initiate transaction
|
|
||||||
if (transInfo.objRequest) {
|
|
||||||
utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances);
|
|
||||||
} else {
|
|
||||||
utalk.sendObject(transInfo.obj, transInfo.acked,
|
|
||||||
transInfo.allInstances);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start timer if a response is expected
|
|
||||||
if (transInfo.objRequest || transInfo.acked) {
|
|
||||||
transTimerSetPeriod(REQ_TIMEOUT_MS);
|
|
||||||
} else {
|
|
||||||
synchronized (transTimer) {
|
|
||||||
transTimer.cancel();
|
|
||||||
transPending = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (ERROR)
|
|
||||||
Log.e(TAG, "Error: inside of processObjectTransaction with no transPending");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check is any objects are pending for periodic updates TODO: Clean-up
|
* Check is any objects are pending for periodic updates TODO: Clean-up
|
||||||
*
|
*
|
||||||
@ -513,6 +387,8 @@ public class Telemetry {
|
|||||||
- offset;
|
- offset;
|
||||||
// Send object
|
// Send object
|
||||||
startTime = System.currentTimeMillis();
|
startTime = System.currentTimeMillis();
|
||||||
|
|
||||||
|
if (DEBUG) Log.d(TAG, "Manual update: " + objinfo.obj.getName());
|
||||||
handler.updatedManual(objinfo.obj);
|
handler.updatedManual(objinfo.obj);
|
||||||
// enqueueObjectUpdates(objinfo.obj, EV_UPDATED_MANUAL,
|
// enqueueObjectUpdates(objinfo.obj, EV_UPDATED_MANUAL,
|
||||||
// true, false);
|
// true, false);
|
||||||
@ -584,12 +460,6 @@ public class Telemetry {
|
|||||||
if (updateTimer != null)
|
if (updateTimer != null)
|
||||||
updateTimer.cancel();
|
updateTimer.cancel();
|
||||||
updateTimer = null;
|
updateTimer = null;
|
||||||
if (transTimerTask != null)
|
|
||||||
transTimerTask.cancel();
|
|
||||||
transTimerTask = null;
|
|
||||||
if (transTimer != null)
|
|
||||||
transTimer.cancel();
|
|
||||||
transTimer = null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -604,8 +474,6 @@ public class Telemetry {
|
|||||||
|
|
||||||
private Timer updateTimer;
|
private Timer updateTimer;
|
||||||
private TimerTask updateTimerTask;
|
private TimerTask updateTimerTask;
|
||||||
private Timer transTimer;
|
|
||||||
private TimerTask transTimerTask;
|
|
||||||
|
|
||||||
private int timeToNextUpdateMs;
|
private int timeToNextUpdateMs;
|
||||||
private int txErrors;
|
private int txErrors;
|
||||||
@ -664,14 +532,15 @@ public class Telemetry {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Perform an update on an object where on an event based on the contents provided
|
||||||
|
* to the constructors. This update will also set a timeout for transaction failure.
|
||||||
|
*/
|
||||||
class ObjectRunnable implements Runnable {
|
class ObjectRunnable implements Runnable {
|
||||||
|
|
||||||
// ! Transaction information to perform
|
// ! Transaction information to perform
|
||||||
private final ObjectQueueInfo objInfo;
|
private final ObjectQueueInfo objInfo;
|
||||||
|
|
||||||
// private final ObjectTransactionInfo transInfo = new
|
|
||||||
// ObjectTransactionInfo();
|
|
||||||
|
|
||||||
ObjectRunnable(ObjectQueueInfo info) {
|
ObjectRunnable(ObjectQueueInfo info) {
|
||||||
Assert.assertNotNull(info);
|
Assert.assertNotNull(info);
|
||||||
objInfo = info;
|
objInfo = info;
|
||||||
@ -725,48 +594,129 @@ public class Telemetry {
|
|||||||
// Determine if this will schedule a new transaction
|
// Determine if this will schedule a new transaction
|
||||||
newTransactionPending = !(newTrans.objRequest || newTrans.acked);
|
newTransactionPending = !(newTrans.objRequest || newTrans.acked);
|
||||||
|
|
||||||
// If there is a transaction pending and this would set up a new one reschedule it
|
|
||||||
if (transPending && newTransactionPending) {
|
|
||||||
if (WARN) Log.w(TAG, "Postponing transaction for" + newTrans.obj.getName());
|
|
||||||
handler.postDelayed(this, 100);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized (transInfo) {
|
synchronized (transInfo) {
|
||||||
|
|
||||||
|
// If there is a transaction pending and this would set up a new one reschedule it
|
||||||
|
if (transPending && newTransactionPending) {
|
||||||
|
if (WARN) Log.w(TAG, "Postponing transaction for" + newTrans.obj.getName());
|
||||||
|
handler.postDelayed(this, 100);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store this as the active transaction
|
||||||
transPending = newTransactionPending;
|
transPending = newTransactionPending;
|
||||||
transInfo = newTrans;
|
transInfo = newTrans;
|
||||||
|
|
||||||
if (DEBUG) Log.d(TAG, "Process Object transaction for " + transInfo.obj.getName());
|
if (DEBUG) Log.d(TAG, "Process Object transaction for " + transInfo.obj.getName());
|
||||||
|
|
||||||
// 3. Execute transaction
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
// Initiate transaction
|
// 3. Execute transaction by sending the appropriate UAVTalk command
|
||||||
if (transInfo.objRequest) {
|
if (transInfo.objRequest) {
|
||||||
if (DEBUG) Log.d(TAG, "Sending object request");
|
if (DEBUG) Log.d(TAG, "Sending object request" + transInfo.obj.getName());
|
||||||
utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances);
|
utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances);
|
||||||
} else {
|
} else {
|
||||||
if (DEBUG) Log.d(TAG, "Sending object " + transInfo.obj.getName() + " " + transInfo.obj.toStringData());
|
if (DEBUG) Log.d(TAG, "Sending object " + transInfo.obj.getName());
|
||||||
utalk.sendObject(transInfo.obj, transInfo.acked, transInfo.allInstances);
|
utalk.sendObject(transInfo.obj, transInfo.acked, transInfo.allInstances);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// TODO: Block if request expected (??)
|
|
||||||
if (transPending) {
|
|
||||||
transTimerSetPeriod(REQ_TIMEOUT_MS);
|
|
||||||
} else if (transTimer != null) {
|
|
||||||
synchronized(transTimer) {
|
|
||||||
transTimer.cancel();
|
|
||||||
transPending = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// TODO Auto-generated catch block
|
if (ERROR) Log.e(TAG, "Unable to send UAVTalk message");
|
||||||
Log.e(TAG, "E");
|
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Post a timeout timer if a response is epxected
|
||||||
|
if (transPending)
|
||||||
|
handler.postDelayed(transactionTimeout, REQ_TIMEOUT_MS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Runnable posted to handle a timeout of a transaction. Tracks the number of retry attempts
|
||||||
|
* retries that many, and finally sends a transaction failed signal.
|
||||||
|
*/
|
||||||
|
final Runnable transactionTimeout = new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
// Lock on the transaction
|
||||||
|
synchronized (transInfo) {
|
||||||
|
|
||||||
|
// Proceed only if there is a pending transaction
|
||||||
|
if (!transPending) {
|
||||||
|
if (WARN) Log.w(TAG,"Transaction completed but timeout still called. Probable race condition");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (DEBUG) Log.d(TAG, "Telemetry: transaction timeout.");
|
||||||
|
|
||||||
|
// Check if more retries are pending
|
||||||
|
if (transInfo.retriesRemaining > 0) {
|
||||||
|
--transInfo.retriesRemaining;
|
||||||
|
|
||||||
|
// Repeat whatever is required for this transaction type
|
||||||
|
// (transInfo.objRequest) {
|
||||||
|
if (DEBUG) Log.d(TAG, "Sending object request");
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Execute transaction by sending the appropriate UAVTalk command
|
||||||
|
if (transInfo.objRequest) {
|
||||||
|
if (DEBUG) Log.d(TAG, "Sending object request" + transInfo.obj.getName());
|
||||||
|
utalk.sendObjectRequest(transInfo.obj, transInfo.allInstances);
|
||||||
|
} else {
|
||||||
|
if (DEBUG) Log.d(TAG, "Sending object " + transInfo.obj.getName());
|
||||||
|
utalk.sendObject(transInfo.obj, transInfo.acked, transInfo.allInstances);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
if (ERROR) Log.e(TAG, "Unable to send UAVTalk message");
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
handler.postDelayed(transactionTimeout, REQ_TIMEOUT_MS);
|
||||||
|
|
||||||
|
++txRetries;
|
||||||
|
} else {
|
||||||
|
if (ERROR) Log.e(TAG, "Transaction failed for: " + transInfo.obj.getName());
|
||||||
|
|
||||||
|
// Terminate transaction. This triggers UAVTalk to send a transaction
|
||||||
|
// failed signal which will make the next queue entry be processed
|
||||||
|
// Note this is UAVTalk listener TransactionFailed function
|
||||||
|
// object specific transaction failed.
|
||||||
|
utalk.cancelPendingTransaction(transInfo.obj);
|
||||||
|
++txErrors;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called when a transaction is successfully completed (UAVTalk event) and maps that to
|
||||||
|
* the appropriate object event as well as canceling the pending transaction and timeout
|
||||||
|
*/
|
||||||
|
private void transactionCompleted(UAVObject obj, boolean result) {
|
||||||
|
|
||||||
|
if (DEBUG) Log.d(TAG, "UAVTalk transactionCompleted");
|
||||||
|
|
||||||
|
// Check if there is a pending transaction and the objects match
|
||||||
|
synchronized(transInfo) {
|
||||||
|
if (transPending && transInfo.obj.getObjID() == obj.getObjID()) {
|
||||||
|
if (DEBUG) Log.d(TAG, "Telemetry: transaction completed for " + obj.getName());
|
||||||
|
|
||||||
|
// Cancel timeout and complete transaction
|
||||||
|
handler.removeCallbacks(transactionTimeout);
|
||||||
|
transPending = false;
|
||||||
|
|
||||||
|
//Send signal
|
||||||
|
obj.transactionCompleted(result);
|
||||||
|
} else {
|
||||||
|
if (ERROR) Log.e(TAG, "Error: received a transaction completed when did not expect it.");
|
||||||
|
transPending = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user