/* * @file Worker.cpp * @author Robert Borzecki (robert.borzecki@mudita.com) * @date 30 maj 2019 * @brief * @copyright Copyright (C) 2019 mudita.com * @details */ #include extern "C" { #include "FreeRTOS.h" #include "task.h" } //module-sys #include "Worker.hpp" namespace sys { void workerTaskFunction(void *ptr) { Worker *worker = reinterpret_cast(ptr); QueueSetMemberHandle_t activeMember; std::vector queues = worker->queues; while (1) { activeMember = xQueueSelectFromSet(worker->queueSet, portMAX_DELAY); // find id of the queue that was activated for (uint32_t i = 0; i < queues.size(); i++) { if (queues[i] == activeMember) { worker->handleMessage(i); } } } } Worker::Worker( sys::Service* service ) : service {service }, serviceQueue{ NULL }, queueSet{ NULL }, taskHandle{ NULL } { } Worker::~Worker() { } bool Worker::init( std::list queuesList ) { //initial value is because there is always a queue to communicate with service uint32_t setSize = SERVICE_QUEUE_LENGTH; auto addQueueInfo = [&](xQueueHandle q, std::string qName) { queueNameMap.insert(std::pair(q, qName)); vQueueAddToRegistry(q, qName.c_str()); queues.push_back(q); }; // iterate over all entries in the list of queues and summarize queue sizes for( auto wqi : queuesList ) { setSize += wqi.length; } //create set of queues queueSet = xQueueCreateSet( setSize ); if( queueSet == NULL ) return false; //create and add all queues to the set. First service queue is created. serviceQueue = xQueueCreate(SERVICE_QUEUE_LENGTH, SERVICE_QUEUE_SIZE ); if(serviceQueue == nullptr){ deinit(); return false; } addQueueInfo(serviceQueue, SERVICE_QUEUE_NAME); // create and add all queues provided from service for (auto wqi : queuesList) { auto q = xQueueCreate(wqi.length, wqi.elementSize); if (q == NULL) { LOG_FATAL("xQueueCreate %s failed", wqi.name.c_str()); deinit(); return false; } addQueueInfo(q, wqi.name); }; //iterate over all queues and add them to set for( uint32_t i=0; iGetName()+"_w" + std::to_string(workerCount); BaseType_t task_error = xTaskCreate(workerTaskFunction, workerName.c_str(), 2048, this, service->GetPriority(), &taskHandle); if ( task_error != pdPASS) { LOG_ERROR("Failed to start the task"); return false; } return true; } bool Worker::stop() { return send( 0,NULL ); } bool Worker::handleMessage( uint32_t queueID ) { QueueHandle_t queue = queues[queueID]; //service queue if( queueID == 0 ) { WorkerCommand wcmd; if( xQueueReceive(queue, &wcmd, 0 ) != pdTRUE ) { return false; } wcmd.command = 1; //place some code here to handle messages from service } return true; } bool Worker::send( uint32_t cmd, uint32_t* data ) { if( serviceQueue != NULL ) { WorkerCommand wcmd {cmd, data }; if( xQueueSend( serviceQueue, &wcmd, portMAX_DELAY ) == pdTRUE ) return true; } return false; } xQueueHandle Worker::getQueueByName(std::string qname) { for (auto q_handle : this->queues) { if (this->queueNameMap[q_handle] == qname) return q_handle; } return nullptr; } } /* namespace sys */