/** ****************************************************************************** * * @file eventdispatcher.c * @author The OpenPilot Team, http://www.openpilot.org Copyright (C) 2010. * @brief Event dispatcher, distributes object events as callbacks. Alternative * to using tasks and queues. All callbacks are invoked from the event task. * @see The GNU Public License (GPL) Version 3 * *****************************************************************************/ /* * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * for more details. * * You should have received a copy of the GNU General Public License along * with this program; if not, write to the Free Software Foundation, Inc., * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "openpilot.h" // Private constants #define MAX_QUEUE_SIZE 10 #define STACK_SIZE 100 #define TASK_PRIORITY 100 #define MAX_UPDATE_PERIOD_MS 1000 #define MIN_UPDATE_PERIOD_MS 1 // Private types /** * Event callback information */ typedef struct { UAVObjEvent ev; /** The actual event */ UAVObjEventCallback cb; /** The callback function */ } EventCallbackInfo; /** * List of object properties that are needed for the periodic updates. */ struct PeriodicObjectListStruct { EventCallbackInfo evInfo; /** Event callback information */ int32_t updatePeriodMs; /** Update period in ms or 0 if no periodic updates are needed */ int32_t timeToNextUpdateMs; /** Time delay to the next update */ struct PeriodicObjectListStruct* next; /** Needed by linked list library (utlist.h) */ }; typedef struct PeriodicObjectListStruct PeriodicObjectList; // Private variables PeriodicObjectList* objList; xQueueHandle queue; xTaskHandle eventTaskHandle; xSemaphoreHandle mutex; // Private functions int32_t processPeriodicUpdates(); void eventTask(); /** * Initialize the dispatcher * \return Success (0), failure (-1) */ int32_t EventDispatcherInitialize() { // Initialize list objList = NULL; // Create mutex mutex = xSemaphoreCreateRecursiveMutex(); if (mutex == NULL) return -1; // Create event queue queue = xQueueCreate(MAX_QUEUE_SIZE, sizeof(UAVObjEvent)); // Create task xTaskCreate( eventTask, (signed char*)"Event", STACK_SIZE, NULL, TASK_PRIORITY, &eventTaskHandle ); // Done return 0; } /** * Dispatch an event by invoking the supplied callback. The function * returns imidiatelly, the callback is invoked from the event task. * \param[in] ev The event to be dispatched * \param[in] cb The callback function * \return Success (0), failure (-1) */ int32_t EventDispatch(UAVObjEvent* ev, UAVObjEventCallback cb) { EventCallbackInfo evInfo; // Initialize event callback information memcpy(&evInfo.ev, ev, sizeof(UAVObjEvent)); evInfo.cb = cb; // Push to queue return xQueueSend(queue, &evInfo, 0); // will not block if queue is full } /** * Dispatch an event through a callback at periodic intervals. * \param[in] ev The event to be dispatched * \param[in] cb The callback to be invoked * \param[in] periodMs The period the event is generated * \return Success (0), failure (-1) */ int32_t EventPeriodicCreate(UAVObjEvent* ev, UAVObjEventCallback cb, int32_t periodMs) { PeriodicObjectList* objEntry; // Get lock xSemaphoreTakeRecursive(mutex, portMAX_DELAY); // Check that the object is not already connected LL_FOREACH(objList, objEntry) { if (objEntry->evInfo.cb == cb && memcmp(&objEntry->evInfo.ev, ev, sizeof(UAVObjEvent)) == 0) { // Already registered, do nothing xSemaphoreGiveRecursive(mutex); return -1; } } // Create handle objEntry = (PeriodicObjectList*)malloc(sizeof(PeriodicObjectList)); if (objEntry == NULL) return -1; memcpy(&objEntry->evInfo.ev, ev, sizeof(UAVObjEvent)); objEntry->updatePeriodMs = periodMs; objEntry->timeToNextUpdateMs = 0; // Add to list LL_APPEND(objList, objEntry); // Release lock xSemaphoreGiveRecursive(mutex); return 0; } /** * Update the period of a periodic event. * \param[in] ev The event to be dispatched * \param[in] cb The callback to be invoked * \param[in] periodMs The period the event is generated * \return Success (0), failure (-1) */ int32_t EventPeriodicUpdate(UAVObjEvent* ev, UAVObjEventCallback cb, int32_t periodMs) { PeriodicObjectList* objEntry; // Get lock xSemaphoreTakeRecursive(mutex, portMAX_DELAY); // Find object LL_FOREACH(objList, objEntry) { if (objEntry->evInfo.cb == cb && memcmp(&objEntry->evInfo.ev, ev, sizeof(UAVObjEvent)) == 0) { // Object found, update period objEntry->updatePeriodMs = periodMs; objEntry->timeToNextUpdateMs = 0; // Release lock xSemaphoreGiveRecursive(mutex); return 0; } } // If this point is reached the object was not found xSemaphoreGiveRecursive(mutex); return -1; } /** * Delete periodic event. * \param[in] ev The event to be dispatched * \param[in] cb The callback to be invoked * \return Success (0), failure (-1) */ int32_t EventPeriodicDelete(UAVObjEvent* ev, UAVObjEventCallback cb) { PeriodicObjectList* objEntry; // Get lock xSemaphoreTakeRecursive(mutex, portMAX_DELAY); // Find object LL_FOREACH(objList, objEntry) { if (objEntry->evInfo.cb == cb && memcmp(&objEntry->evInfo.ev, ev, sizeof(UAVObjEvent)) == 0) { // Object found, remove from list LL_DELETE(objList, objEntry); free(objEntry); xSemaphoreGiveRecursive(mutex); return 0; } } // If this point is reached the object was not found xSemaphoreGiveRecursive(mutex); return -1; } /** * Event task, responsible of invoking callbacks. */ void eventTask() { int32_t timeToNextUpdateMs; int32_t delayMs; EventCallbackInfo evInfo; // Initialize time timeToNextUpdateMs = xTaskGetTickCount()*portTICK_RATE_MS; // Loop forever while (1) { // Calculate delay time delayMs = timeToNextUpdateMs-(xTaskGetTickCount()*portTICK_RATE_MS); if (delayMs < 0) { delayMs = 0; } // Wait for queue message if ( xQueueReceive(queue, &evInfo, delayMs/portTICK_RATE_MS) == pdTRUE ) { // Invoke callback evInfo.cb(&evInfo.ev); // the function is expected to copy the event information } // Process periodic updates if ((xTaskGetTickCount()*portTICK_RATE_MS) >= timeToNextUpdateMs ) { timeToNextUpdateMs = processPeriodicUpdates(); } } } /** * Handle periodic updates for all objects. * \return The system time until the next update (in ms) or -1 if failed */ int32_t processPeriodicUpdates() { static int32_t timeOfLastUpdate = 0; PeriodicObjectList* objEntry; int32_t delaySinceLastUpdateMs; int32_t minDelay = MAX_UPDATE_PERIOD_MS; // Get lock xSemaphoreTakeRecursive(mutex, portMAX_DELAY); // Iterate through each object and update its timer, if zero then transmit object. // Also calculate smallest delay to next update. delaySinceLastUpdateMs = xTaskGetTickCount()*portTICK_RATE_MS - timeOfLastUpdate; LL_FOREACH(objList, objEntry) { // If object is configured for periodic updates if (objEntry->updatePeriodMs > 0) { objEntry->timeToNextUpdateMs -= delaySinceLastUpdateMs; // Check if time for the next update if (objEntry->timeToNextUpdateMs <= 0) { // Reset timer objEntry->timeToNextUpdateMs = objEntry->updatePeriodMs; // Invoke callback objEntry->evInfo.cb(&objEntry->evInfo.ev); // the function is expected to copy the event information } // Update minimum delay if (objEntry->timeToNextUpdateMs < minDelay) { minDelay = objEntry->timeToNextUpdateMs; } } } // Check if delay for the next update is too short if (minDelay < MIN_UPDATE_PERIOD_MS) { minDelay = MIN_UPDATE_PERIOD_MS; } // Done timeOfLastUpdate = xTaskGetTickCount()*portTICK_RATE_MS; xSemaphoreGiveRecursive(mutex); return timeOfLastUpdate + minDelay; }