1
0
mirror of https://bitbucket.org/librepilot/librepilot.git synced 2025-01-17 02:52:12 +01:00

reimplemented delayed callback scheduler to allow dispatching from ISR's and multiple schedulers with different task priorities

This commit is contained in:
Corvus Corax 2013-04-14 04:16:34 +02:00
parent bbb83752f7
commit 87ed9a8be2
3 changed files with 193 additions and 64 deletions

View File

@ -2,7 +2,7 @@
******************************************************************************
*
* @file callbackscheduler.c
* @author The OpenPilot Team, http://www.openpilot.org Copyright (C) 2010.
* @author The OpenPilot Team, http://www.openpilot.org Copyright (C) 2013.
* @brief Scheduler to run callback functions from a shared context with given priorities.
*
* @see The GNU Public License (GPL) Version 3
@ -27,69 +27,93 @@
#include "openpilot.h"
// Private constants
#define MAX_QUEUE_SIZE 1
#define STACK_SIZE 128
#define TASK_PRIORITY (tskIDLE_PRIORITY + 1) // we run at regular update speed
#define MAX_UPDATE_PERIOD_MS 1000
// Private types
/**
* task information
*/
struct DelayedCallbackTaskStruct {
DelayedCallbackInfo *callbackQueue[CALLBACK_PRIORITY_LOW+1];
DelayedCallbackInfo *queueCursor[CALLBACK_PRIORITY_LOW+1];
xTaskHandle callbackSchedulerTaskHandle;
signed char name[3];
uint32_t stackSize;
long taskPriority;
xSemaphoreHandle signal;
struct DelayedCallbackTaskStruct *next;
};
/**
* callback information
*/
struct DelayedCallbackInfoStruct {
DelayedCallback cb;
bool volatile waiting;
struct DelayedCallbackTaskStruct *task;
struct DelayedCallbackInfoStruct *next;
};
// Private variables
static DelayedCallbackInfo *callbackQueue[CALLBACK_PRIORITY_LOW+1];
static DelayedCallbackInfo *queueCursor[CALLBACK_PRIORITY_LOW+1];
static xQueueHandle queue;
static xTaskHandle callbackSchedulerTaskHandle;
static struct DelayedCallbackTaskStruct *schedulerTasks;
static xSemaphoreHandle mutex;
static uint32_t stack_size;
static bool scheduler_started;
static bool schedulerStarted;
// Private functions
static void CallbackSchedulerTask();
static bool runNextCallback(DelayedCallbackPriority priority);
static void CallbackSchedulerTask(void *task);
static bool runNextCallback(struct DelayedCallbackTaskStruct *task, DelayedCallbackPriority priority);
/**
* Initialize the dispatcher
* Initialize the scheduler
* must be called before any othyer functions are called
* \return Success (0), failure (-1)
*/
int32_t CallbackSchedulerInitialize()
{
// Initialize variables
for (DelayedCallbackPriority p=0; p<=CALLBACK_PRIORITY_LOW; p++) callbackQueue[p] = NULL;
schedulerTasks = NULL;
schedulerStarted = false;
// Create mutex
mutex = xSemaphoreCreateRecursiveMutex();
if (mutex == NULL)
if (mutex == NULL) {
return -1;
// Create event queue (dummy queue for self signals)
queue = xQueueCreate(1,sizeof(DelayedCallbackInfo*));
//minimal stack_size
stack_size = STACK_SIZE;
scheduler_started = false;
}
// Done
return 0;
}
/**
* Start the scheduler
* Will
* \return Success (0), failure (-1)
*/
int32_t CallbackSchedulerStart()
{
// Create task
scheduler_started = true;
xTaskCreate( CallbackSchedulerTask, (signed char*)"CallbackScheduler", stack_size/4, NULL, TASK_PRIORITY, &callbackSchedulerTaskHandle );
TaskMonitorAdd(TASKINFO_RUNNING_CALLBACKSCHEDULER, callbackSchedulerTaskHandle);
xSemaphoreTakeRecursive(mutex, portMAX_DELAY);
// only call once
PIOS_Assert(schedulerStarted==false);
// start tasks
struct DelayedCallbackTaskStruct *cursor;
int t=0;
LL_FOREACH(schedulerTasks,cursor) {
xTaskCreate( CallbackSchedulerTask, cursor->name, cursor->stackSize/4, cursor, cursor->taskPriority, &cursor->callbackSchedulerTaskHandle );
if (TASKINFO_RUNNING_CALLBACKSCHEDULER0+t <= TASKINFO_RUNNING_CALLBACKSCHEDULER3) {
TaskMonitorAdd(TASKINFO_RUNNING_CALLBACKSCHEDULER0 + t, cursor->callbackSchedulerTaskHandle);
}
t++;
}
schedulerStarted = true;
xSemaphoreGiveRecursive(mutex);
return 0;
}
@ -97,39 +121,130 @@ int32_t CallbackSchedulerStart()
* Dispatch an event by invoking the supplied callback. The function
* returns imidiatelly, the callback is invoked from the event task.
* \param[in] cbinfo the callback handle
* \return Success (0), failure (-1)
* \return Success (-1), failure (0)
*/
int32_t DelayedCallbackDispatch(DelayedCallbackInfo *cbinfo)
{
// no semaphore needed
PIOS_Assert(cbinfo);
// no semaphore needed for the callback
cbinfo->waiting=true;
// Push to queue
return xQueueSend(queue, &cbinfo, 0); // will not block if queue is full
// but the scheduler as a whole needs to be notified
return xSemaphoreGive(cbinfo->task->signal);
}
/**
* Dispatch an event at periodic intervals.
* \param[in] ev The event to be dispatched
* Dispatch an event by invoking the supplied callback. The function
* returns imidiatelly, the callback is invoked from the event task.
* \param[in] cbinfo the callback handle
* \param[in] pxHigherPriorityTaskWoken
* xSemaphoreGiveFromISR() will set *pxHigherPriorityTaskWoken to pdTRUE if
* giving the semaphore caused a task to unblock, and the unblocked task has a
* priority higher than the currently running task. If xSemaphoreGiveFromISR()
* sets this value to pdTRUE then a context switch should be requested before
* the interrupt is exited.
* From FreeRTOS Docu: Context switching from an ISR uses port specific syntax.
* Check the demo task for your port to find the syntax required.
* \return Success (-1), failure (0)
*/
int32_t DelayedCallbackDispatchFromISR(DelayedCallbackInfo *cbinfo, long *pxHigherPriorityTaskWoken)
{
PIOS_Assert(cbinfo);
// no semaphore needed for the callback
cbinfo->waiting=true;
// but the scheduler as a whole needs to be notified
return xSemaphoreGiveFromISR(cbinfo->task->signal, pxHigherPriorityTaskWoken);
}
/**
* Register a new callback to be called by a delayed callback scheduler task
* \param[in] cb The callback to be invoked
* \param[in] periodMs The period the event is generated
* \param[in] priority Priority of the callback compared to other callbacks called by the same delayed callback scheduler task.
* \param[in] taskPriority Priority of the scheduler task. One scheduler task will be spawned for each distinct task priority in use, further callbacks with the same task priority will be handled by the same delayed callback scheduler task alternatingly according to their callback priority.
* \param[in] stacksize The stack requirements of the callback when called by the scheduler.
* \return Success (0), failure (-1)
*/
DelayedCallbackInfo* DelayedCallbackCreate(DelayedCallback cb, DelayedCallbackPriority priority, uint32_t stacksize)
DelayedCallbackInfo* DelayedCallbackCreate(DelayedCallback cb, DelayedCallbackPriority priority, long taskPriority, uint32_t stacksize)
{
xSemaphoreTakeRecursive(mutex, portMAX_DELAY);
if (!scheduler_started && stacksize>stack_size) stack_size=stacksize; // during system initialisation we can adapt to the maximum needed stack
xSemaphoreGiveRecursive(mutex);
// find appropriate scheduler task matching taskPriority
struct DelayedCallbackTaskStruct *task=NULL;
int t=0;
LL_FOREACH(schedulerTasks, task) {
if (task->taskPriority == taskPriority) {
break; // found
}
t++;
}
// if scheduler task for given taskPriority does not exist, create it
if (!task) {
// allocate memory if possible
task = (struct DelayedCallbackTaskStruct*)pvPortMalloc(sizeof(struct DelayedCallbackTaskStruct));
if (!task) {
xSemaphoreGiveRecursive(mutex);
return NULL;
}
if (stacksize>stack_size) return NULL; // error - not enough memory
// initialize structure
for (DelayedCallbackPriority p=0; p<=CALLBACK_PRIORITY_LOW; p++) {
task->callbackQueue[p] = NULL;
task->queueCursor[p] = NULL;
}
task->name[0] = 'C';
task->name[1] = 'a'+t;
task->name[2] = 0;
task->stackSize = ((STACK_SIZE>stacksize)?STACK_SIZE:stacksize);
task->taskPriority = taskPriority;
task->next = NULL;
// create process info
// create the signaling semaphore
vSemaphoreCreateBinary( task->signal );
if (!task->signal) {
xSemaphoreGiveRecursive(mutex);
return NULL;
}
// add to list of scheduler tasks
LL_APPEND(schedulerTasks, task);
// Previously registered tasks are spawned when CallbackSchedulerStart() is called.
// Tasks registered afterwards need to spawn upon creation.
if (schedulerStarted) {
xTaskCreate( CallbackSchedulerTask, task->name, task->stackSize/4, task, task->taskPriority, &task->callbackSchedulerTaskHandle );
if (TASKINFO_RUNNING_CALLBACKSCHEDULER0 + t <= TASKINFO_RUNNING_CALLBACKSCHEDULER3) {
TaskMonitorAdd(TASKINFO_RUNNING_CALLBACKSCHEDULER0 + t, task->callbackSchedulerTaskHandle);
}
}
}
if (!schedulerStarted && stacksize > task->stackSize) {
task->stackSize = stacksize; // previous to task initialisation we can still adapt to the maximum needed stack
}
if (stacksize > task->stackSize) {
xSemaphoreGiveRecursive(mutex);
return NULL; // error - not enough memory
}
// initialize callback scheduling info
DelayedCallbackInfo *info = (DelayedCallbackInfo*)pvPortMalloc(sizeof(DelayedCallbackInfo));
if (!info) {
xSemaphoreGiveRecursive(mutex);
return NULL; // error - not enough memory
}
info->next = NULL;
info->waiting = false;
info->task = task;
info->cb = cb;
// add to scheduling queue
LL_APPEND(callbackQueue[priority],info);
LL_APPEND(task->callbackQueue[priority],info);
xSemaphoreGiveRecursive(mutex);
return info;
}
@ -137,38 +252,41 @@ DelayedCallbackInfo* DelayedCallbackCreate(DelayedCallback cb, DelayedCallbackPr
/**
* Scheduler subtask
* \param[in] priority The scheduling priority of the callback to search for
* \return Callback has been executed (boolean)
*/
static bool runNextCallback(DelayedCallbackPriority priority) {
static bool runNextCallback(struct DelayedCallbackTaskStruct *task, DelayedCallbackPriority priority) {
// no such queue
if (priority>CALLBACK_PRIORITY_LOW) return false;
if (priority > CALLBACK_PRIORITY_LOW) {
return false;
}
// queue is empty, search a lower priority queue
if (callbackQueue[priority]==NULL)
return runNextCallback(priority+1);
if (task->callbackQueue[priority] == NULL) {
return runNextCallback(task, priority + 1);
}
DelayedCallbackInfo *current=queueCursor[priority];
DelayedCallbackInfo *current = task->queueCursor[priority];
DelayedCallbackInfo *next;
do {
if (current==NULL) {
next=callbackQueue[priority]; // loop around the end of the list
next=task->callbackQueue[priority]; // loop around the end of the list
// also attempt to run a callback that has lower priority
// every time the queue is completely traversed
if (runNextCallback(priority+1)) {
queueCursor[priority]=next;
if (runNextCallback(task, priority + 1)) {
task->queueCursor[priority] = next;
return true;
}
} else {
next=current->next;
next = current->next;
if (current->waiting) {
queueCursor[priority]=next;
current->waiting=false; // flag is reset just before execution.
task->queueCursor[priority] = next;
current->waiting = false; // flag is reset just before execution.
current->cb(); // call the callback
return true;
}
}
current=next;
} while (current!=queueCursor[priority]);
current = next;
} while (current != task->queueCursor[priority]);
// once the list has been traversed entirely once, abort (nothing to do)
return false;
}
@ -176,15 +294,16 @@ static bool runNextCallback(DelayedCallbackPriority priority) {
/**
* Scheduler task, responsible of invoking callbacks.
* \param[in] task The scheduling task being run
*/
static void CallbackSchedulerTask()
static void CallbackSchedulerTask(void *task)
{
static DelayedCallbackInfo *dummy;
while (1) {
if (!runNextCallback(CALLBACK_PRIORITY_CRITICAL)) {
if (!runNextCallback((struct DelayedCallbackTaskStruct*) task,CALLBACK_PRIORITY_CRITICAL)) {
// queue has no entries, sleep
xQueueReceive(queue, &dummy, MAX_UPDATE_PERIOD_MS/portTICK_RATE_MS);
xSemaphoreTake( ((struct DelayedCallbackTaskStruct*)task)->signal, portMAX_DELAY);
}
}
}

View File

@ -39,6 +39,7 @@ typedef struct DelayedCallbackInfoStruct DelayedCallbackInfo;
int32_t CallbackSchedulerInitialize();
int32_t CallbackSchedulerStart();
int32_t DelayedCallbackDispatch(DelayedCallbackInfo *cbinfo);
DelayedCallbackInfo* DelayedCallbackCreate(DelayedCallback cb, DelayedCallbackPriority priority, uint32_t stacksize);
int32_t DelayedCallbackDispatchFromISR(DelayedCallbackInfo *cbinfo, long *pxHigherPriorityTaskWoken);
DelayedCallbackInfo* DelayedCallbackCreate(DelayedCallback cb, DelayedCallbackPriority priority, long taskPriority, uint32_t stacksize);
#endif // CALLBACKSCHEDULER_H

View File

@ -27,7 +27,10 @@
<elementname>ModemStat</elementname>
<elementname>Autotune</elementname>
<elementname>EventDispatcher</elementname>
<elementname>CallbackScheduler</elementname>
<elementname>CallbackScheduler0</elementname>
<elementname>CallbackScheduler1</elementname>
<elementname>CallbackScheduler2</elementname>
<elementname>CallbackScheduler3</elementname>
</elementnames>
</field>
<field name="Running" units="bool" type="enum">
@ -56,7 +59,10 @@
<elementname>ModemStat</elementname>
<elementname>Autotune</elementname>
<elementname>EventDispatcher</elementname>
<elementname>CallbackScheduler</elementname>
<elementname>CallbackScheduler0</elementname>
<elementname>CallbackScheduler1</elementname>
<elementname>CallbackScheduler2</elementname>
<elementname>CallbackScheduler3</elementname>
</elementnames>
<options>
<option>False</option>
@ -89,7 +95,10 @@
<elementname>ModemStat</elementname>
<elementname>Autotune</elementname>
<elementname>EventDispatcher</elementname>
<elementname>CallbackScheduler</elementname>
<elementname>CallbackScheduler0</elementname>
<elementname>CallbackScheduler1</elementname>
<elementname>CallbackScheduler2</elementname>
<elementname>CallbackScheduler3</elementname>
</elementnames>
</field>
<access gcs="readwrite" flight="readwrite"/>