1
0
mirror of https://bitbucket.org/librepilot/librepilot.git synced 2025-01-18 03:52:11 +01:00
LibrePilot/flight/uavobjects/eventdispatcher.c

384 lines
13 KiB
C

/**
******************************************************************************
*
* @file eventdispatcher.c
* @author The OpenPilot Team, http://www.openpilot.org Copyright (C) 2010.
* @brief Event dispatcher, distributes object events as callbacks. Alternative
* to using tasks and queues. All callbacks are invoked from the event task.
* @see The GNU Public License (GPL) Version 3
*
*****************************************************************************/
/*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <openpilot.h>
#include <taskinfo.h>
// Private constants
#if defined(PIOS_EVENTDISAPTCHER_QUEUE)
#define MAX_QUEUE_SIZE PIOS_EVENTDISAPTCHER_QUEUE
#else
#define MAX_QUEUE_SIZE 20
#endif
#if defined(PIOS_EVENTDISPATCHER_STACK_SIZE)
#define STACK_SIZE PIOS_EVENTDISPATCHER_STACK_SIZE
#else
#define STACK_SIZE configMINIMAL_STACK_SIZE
#endif /* PIOS_EVENTDISPATCHER_STACK_SIZE */
#define CALLBACK_PRIORITY CALLBACK_PRIORITY_CRITICAL
#define TASK_PRIORITY CALLBACK_TASK_FLIGHTCONTROL
#define MAX_UPDATE_PERIOD_MS 1000
// Private types
/**
* Event callback information
*/
typedef struct {
UAVObjEvent ev; /** The actual event */
UAVObjEventCallback cb; /** The callback function, or zero if none */
xQueueHandle queue; /** The queue or zero if none */
} EventCallbackInfo;
/**
* List of object properties that are needed for the periodic updates.
*/
struct PeriodicObjectListStruct {
EventCallbackInfo evInfo; /** Event callback information */
uint16_t updatePeriodMs; /** Update period in ms or 0 if no periodic updates are needed */
int32_t timeToNextUpdateMs; /** Time delay to the next update */
struct PeriodicObjectListStruct *next; /** Needed by linked list library (utlist.h) */
};
typedef struct PeriodicObjectListStruct PeriodicObjectList;
// Private variables
static PeriodicObjectList *mObjList;
static xQueueHandle mQueue;
static DelayedCallbackInfo *eventSchedulerCallback;
static xSemaphoreHandle mMutex;
static EventStats mStats;
// Private functions
static int32_t processPeriodicUpdates();
static void eventTask();
static int32_t eventPeriodicCreate(UAVObjEvent *ev, UAVObjEventCallback cb, xQueueHandle queue, uint16_t periodMs);
static int32_t eventPeriodicUpdate(UAVObjEvent *ev, UAVObjEventCallback cb, xQueueHandle queue, uint16_t periodMs);
static uint16_t randomizePeriod(uint16_t periodMs);
/**
* Initialize the dispatcher
* \return Success (0), failure (-1)
*/
int32_t EventDispatcherInitialize()
{
// Initialize variables
mObjList = NULL;
memset(&mStats, 0, sizeof(EventStats));
// Create mMutex
mMutex = xSemaphoreCreateRecursiveMutex();
if (mMutex == NULL) {
return -1;
}
// Create event queue
mQueue = xQueueCreate(MAX_QUEUE_SIZE, sizeof(EventCallbackInfo));
// Create callback
eventSchedulerCallback = DelayedCallbackCreate(&eventTask, CALLBACK_PRIORITY, TASK_PRIORITY, STACK_SIZE * 4);
DelayedCallbackDispatch(eventSchedulerCallback);
// Done
return 0;
}
/**
* Get the statistics counters
* @param[out] statsOut The statistics counters will be copied there
*/
void EventGetStats(EventStats *statsOut)
{
xSemaphoreTakeRecursive(mMutex, portMAX_DELAY);
memcpy(statsOut, &mStats, sizeof(EventStats));
xSemaphoreGiveRecursive(mMutex);
}
/**
* Clear the statistics counters
*/
void EventClearStats()
{
xSemaphoreTakeRecursive(mMutex, portMAX_DELAY);
memset(&mStats, 0, sizeof(EventStats));
xSemaphoreGiveRecursive(mMutex);
}
/**
* Dispatch an event by invoking the supplied callback. The function
* returns imidiatelly, the callback is invoked from the event task.
* \param[in] ev The event to be dispatched
* \param[in] cb The callback function
* \return Success (0), failure (-1)
*/
int32_t EventCallbackDispatch(UAVObjEvent *ev, UAVObjEventCallback cb)
{
EventCallbackInfo evInfo;
// Initialize event callback information
memcpy(&evInfo.ev, ev, sizeof(UAVObjEvent));
evInfo.cb = cb;
evInfo.queue = 0;
// Push to queue
int32_t result = xQueueSend(mQueue, &evInfo, 0); // will not block if queue is full
DelayedCallbackDispatch(eventSchedulerCallback);
return result;
}
/**
* Dispatch an event at periodic intervals.
* \param[in] ev The event to be dispatched
* \param[in] cb The callback to be invoked
* \param[in] periodMs The period the event is generated
* \return Success (0), failure (-1)
*/
int32_t EventPeriodicCallbackCreate(UAVObjEvent *ev, UAVObjEventCallback cb, uint16_t periodMs)
{
return eventPeriodicCreate(ev, cb, 0, periodMs);
}
/**
* Update the period of a periodic event.
* \param[in] ev The event to be dispatched
* \param[in] cb The callback to be invoked
* \param[in] periodMs The period the event is generated
* \return Success (0), failure (-1)
*/
int32_t EventPeriodicCallbackUpdate(UAVObjEvent *ev, UAVObjEventCallback cb, uint16_t periodMs)
{
return eventPeriodicUpdate(ev, cb, 0, periodMs);
}
/**
* Dispatch an event at periodic intervals.
* \param[in] ev The event to be dispatched
* \param[in] queue The queue that the event will be pushed in
* \param[in] periodMs The period the event is generated
* \return Success (0), failure (-1)
*/
int32_t EventPeriodicQueueCreate(UAVObjEvent *ev, xQueueHandle queue, uint16_t periodMs)
{
return eventPeriodicCreate(ev, 0, queue, periodMs);
}
/**
* Update the period of a periodic event.
* \param[in] ev The event to be dispatched
* \param[in] queue The queue
* \param[in] periodMs The period the event is generated
* \return Success (0), failure (-1)
*/
int32_t EventPeriodicQueueUpdate(UAVObjEvent *ev, xQueueHandle queue, uint16_t periodMs)
{
return eventPeriodicUpdate(ev, 0, queue, periodMs);
}
/**
* Dispatch an event through a callback at periodic intervals.
* \param[in] ev The event to be dispatched
* \param[in] cb The callback to be invoked or zero if none
* \param[in] queue The queue or zero if none
* \param[in] periodMs The period the event is generated
* \return Success (0), failure (-1)
*/
static int32_t eventPeriodicCreate(UAVObjEvent *ev, UAVObjEventCallback cb, xQueueHandle queue, uint16_t periodMs)
{
PeriodicObjectList *objEntry;
// Get lock
xSemaphoreTakeRecursive(mMutex, portMAX_DELAY);
// Check that the object is not already connected
LL_FOREACH(mObjList, objEntry) {
if (objEntry->evInfo.cb == cb &&
objEntry->evInfo.queue == queue &&
objEntry->evInfo.ev.obj == ev->obj &&
objEntry->evInfo.ev.instId == ev->instId &&
objEntry->evInfo.ev.event == ev->event) {
// Already registered, do nothing
xSemaphoreGiveRecursive(mMutex);
return -1;
}
}
// Create handle
objEntry = (PeriodicObjectList *)pvPortMalloc(sizeof(PeriodicObjectList));
if (objEntry == NULL) {
return -1;
}
objEntry->evInfo.ev.obj = ev->obj;
objEntry->evInfo.ev.instId = ev->instId;
objEntry->evInfo.ev.event = ev->event;
objEntry->evInfo.cb = cb;
objEntry->evInfo.queue = queue;
objEntry->updatePeriodMs = periodMs;
objEntry->timeToNextUpdateMs = randomizePeriod(periodMs); // avoid bunching of updates
// Add to list
LL_APPEND(mObjList, objEntry);
// Release lock
xSemaphoreGiveRecursive(mMutex);
return 0;
}
/**
* Update the period of a periodic event.
* \param[in] ev The event to be dispatched
* \param[in] cb The callback to be invoked or zero if none
* \param[in] queue The queue or zero if none
* \param[in] periodMs The period the event is generated
* \return Success (0), failure (-1)
*/
static int32_t eventPeriodicUpdate(UAVObjEvent *ev, UAVObjEventCallback cb, xQueueHandle queue, uint16_t periodMs)
{
PeriodicObjectList *objEntry;
// Get lock
xSemaphoreTakeRecursive(mMutex, portMAX_DELAY);
// Find object
LL_FOREACH(mObjList, objEntry) {
if (objEntry->evInfo.cb == cb &&
objEntry->evInfo.queue == queue &&
objEntry->evInfo.ev.obj == ev->obj &&
objEntry->evInfo.ev.instId == ev->instId &&
objEntry->evInfo.ev.event == ev->event) {
// Object found, update period
objEntry->updatePeriodMs = periodMs;
objEntry->timeToNextUpdateMs = randomizePeriod(periodMs); // avoid bunching of updates
// Release lock
xSemaphoreGiveRecursive(mMutex);
return 0;
}
}
// If this point is reached the object was not found
xSemaphoreGiveRecursive(mMutex);
return -1;
}
/**
* Delayed event callback, responsible of invoking (event) callbacks.
*/
static void eventTask()
{
static uint32_t timeToNextUpdateMs = 0;
EventCallbackInfo evInfo;
// Wait for queue message
int limit = MAX_QUEUE_SIZE;
while (xQueueReceive(mQueue, &evInfo, 0) == pdTRUE) {
// Invoke callback, if any
if (evInfo.cb != 0) {
evInfo.cb(&evInfo.ev); // the function is expected to copy the event information
}
// limit loop to max queue size to slightly reduce the impact of recursive events
if (!--limit) {
break;
}
}
// Process periodic updates
if ((xTaskGetTickCount() * portTICK_RATE_MS) >= timeToNextUpdateMs) {
timeToNextUpdateMs = processPeriodicUpdates();
}
DelayedCallbackSchedule(eventSchedulerCallback, timeToNextUpdateMs - (xTaskGetTickCount() * portTICK_RATE_MS), CALLBACK_UPDATEMODE_SOONER);
}
/**
* Handle periodic updates for all objects.
* \return The system time until the next update (in ms) or -1 if failed
*/
static int32_t processPeriodicUpdates()
{
PeriodicObjectList *objEntry;
int32_t timeNow;
int32_t timeToNextUpdate;
int32_t offset;
// Get lock
xSemaphoreTakeRecursive(mMutex, portMAX_DELAY);
// Iterate through each object and update its timer, if zero then transmit object.
// Also calculate smallest delay to next update.
timeToNextUpdate = xTaskGetTickCount() * portTICK_RATE_MS + MAX_UPDATE_PERIOD_MS;
LL_FOREACH(mObjList, objEntry) {
// If object is configured for periodic updates
if (objEntry->updatePeriodMs > 0) {
// Check if time for the next update
timeNow = xTaskGetTickCount() * portTICK_RATE_MS;
if (objEntry->timeToNextUpdateMs <= timeNow) {
// Reset timer
offset = (timeNow - objEntry->timeToNextUpdateMs) % objEntry->updatePeriodMs;
objEntry->timeToNextUpdateMs = timeNow + objEntry->updatePeriodMs - offset;
// Invoke callback, if one
if (objEntry->evInfo.cb != 0) {
objEntry->evInfo.cb(&objEntry->evInfo.ev); // the function is expected to copy the event information
}
// Push event to queue, if one
if (objEntry->evInfo.queue != 0) {
if (xQueueSend(objEntry->evInfo.queue, &objEntry->evInfo.ev, 0) != pdTRUE) { // do not block if queue is full
if (objEntry->evInfo.ev.obj != NULL) {
mStats.lastErrorID = UAVObjGetID(objEntry->evInfo.ev.obj);
}
++mStats.eventErrors;
}
}
}
// Update minimum delay
if (objEntry->timeToNextUpdateMs < timeToNextUpdate) {
timeToNextUpdate = objEntry->timeToNextUpdateMs;
}
}
}
// Done
xSemaphoreGiveRecursive(mMutex);
return timeToNextUpdate;
}
/**
* Return a psedorandom integer from 0 to periodMs
* Based on the Park-Miller-Carta Pseudo-Random Number Generator
* http://www.firstpr.com.au/dsp/rand31/
*/
static uint16_t randomizePeriod(uint16_t periodMs)
{
static uint32_t seed = 1;
uint32_t hi, lo;
lo = 16807 * (seed & 0xFFFF);
hi = 16807 * (seed >> 16);
lo += (hi & 0x7FFF) << 16;
lo += hi >> 15;
if (lo > 0x7FFFFFFF) {
lo -= 0x7FFFFFFF;
}
seed = lo;
return (uint16_t)(((float)periodMs * (float)lo) / (float)0x7FFFFFFF);
}