// Copyright (c) 2017-2020, Mudita Sp. z.o.o. All rights reserved. // For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md #include "Worker.hpp" extern "C" { #include "FreeRTOS.h" #include "task.h" } #include #include #include #include namespace sys { unsigned int Worker::count = 0; void Worker::taskAdapter(void *taskParam) { Worker *worker = static_cast(taskParam); worker->task(); } bool Worker::handleControlMessage() { std::uint8_t receivedMessage; xQueueReceive(controlQueue, &receivedMessage, 0); LOG_INFO("Handle control message: %u", receivedMessage); assert(receivedMessage < controlMessagesCount); switch (static_cast(receivedMessage)) { // stop the thread case ControlMessage::Stop: { setState(State::Stopping); } break; default: { LOG_FATAL("Unexpected control message %d received", receivedMessage); return false; } break; } return true; } void Worker::task() { QueueSetMemberHandle_t activeMember; while (getState() == State::Running) { activeMember = xQueueSelectFromSet(queueSet, portMAX_DELAY); // handle control messages from parent service if (activeMember == controlQueue) { handleControlMessage(); continue; } // find id of the queue that was activated for (uint32_t i = 0; i < queues.size(); i++) { if (queues[i] == activeMember) { handleMessage(i); } } } // inform about thread end and wait for the deletion xSemaphoreGive(joinSemaphore); setState(State::Stopped); vTaskDelete(nullptr); } Worker::Worker(sys::Service *service) : service{service} {} Worker::~Worker() { if (state != State::Destroyed) { LOG_FATAL("Calling destructor of an undestroyed worker."); } } void Worker::addQueueInfo(xQueueHandle q, std::string qName) { queueNameMap.emplace(std::make_pair(q, qName)); vQueueAddToRegistry(q, qName.c_str()); queues.push_back(q); } inline std::string Worker::getControlQueueName() const { return controlQueueNamePrefix + std::to_string(id); } bool Worker::init(std::list queuesList) { assert(state == State::New); // assign worker id taskENTER_CRITICAL(); id = count++; taskEXIT_CRITICAL(); name = service->GetName() + "_w" + std::to_string(id); // initial value is because there is always a service and control queue // to communicate with the parent service auto setSize = SERVICE_QUEUE_LENGTH + CONTROL_QUEUE_LENGTH; // iterate over all entries in the list of queues and summarize queue sizes for (auto wqi : queuesList) { setSize += wqi.length; } // create set of queues queueSet = xQueueCreateSet(setSize); if (queueSet == nullptr) { state = State::Invalid; return false; } // create and add all queues to the set. First service queue is created. serviceQueue = xQueueCreate(SERVICE_QUEUE_LENGTH, SERVICE_QUEUE_SIZE); if (serviceQueue == nullptr) { state = State::Invalid; deinit(); return false; } addQueueInfo(serviceQueue, SERVICE_QUEUE_NAME); // create control queue controlQueue = xQueueCreate(CONTROL_QUEUE_LENGTH, sizeof(std::uint8_t)); if (controlQueue == nullptr) { state = State::Invalid; deinit(); return false; } addQueueInfo(controlQueue, getControlQueueName()); // create and add all queues provided from service for (auto wqi : queuesList) { auto q = xQueueCreate(wqi.length, wqi.elementSize); if (q == nullptr) { LOG_FATAL("xQueueCreate %s failed", wqi.name.c_str()); state = State::Invalid; deinit(); return false; } addQueueInfo(q, wqi.name); }; // iterate over all queues and add them to set for (uint32_t i = 0; i < queues.size(); ++i) { if (xQueueAddToSet(queues[i], queueSet) != pdPASS) { state = State::Invalid; deinit(); return false; } } // create join semaphore joinSemaphore = xSemaphoreCreateBinary(); // state protector stateMutex = xSemaphoreCreateMutex(); // it is safe to use getState/setState methods now setState(State::Initiated); return true; } bool Worker::deinit() { // for all queues - remove from set and delete queue for (auto q : queues) { // remove queues from set xQueueRemoveFromSet(q, queueSet); // delete queue vQueueDelete(q); } queues.clear(); // delete queues set vQueueDelete((QueueHandle_t)queueSet); vSemaphoreDelete(joinSemaphore); vSemaphoreDelete(stateMutex); setState(State::Destroyed); return true; }; /** * This method starts RTOS thread that waits for incoming queue events. */ bool Worker::run() { assert(getState() == State::Initiated); runnerTask = xTaskGetCurrentTaskHandle(); BaseType_t task_error = xTaskCreate(Worker::taskAdapter, name.c_str(), defaultStackSize, this, service->GetPriority(), &taskHandle); if (task_error != pdPASS) { LOG_ERROR("Failed to start the task"); return false; } setState(State::Running); return true; } bool Worker::stop() { assert(xTaskGetCurrentTaskHandle() == runnerTask); assert(getState() == State::Running); return sendControlMessage(ControlMessage::Stop); } bool Worker::sendControlMessage(ControlMessage message) { auto messageToSend = static_cast(message); return xQueueSend(controlQueue, &messageToSend, portMAX_DELAY) == pdTRUE; } bool Worker::send(uint32_t cmd, uint32_t *data) { assert(xTaskGetCurrentTaskHandle() == runnerTask); assert(getState() == State::Running); if (serviceQueue != nullptr) { WorkerCommand workerCommand{cmd, data}; if (xQueueSend(serviceQueue, &workerCommand, portMAX_DELAY) == pdTRUE) { return true; } } return false; } xQueueHandle Worker::getQueueByName(std::string qname) { for (auto q_handle : this->queues) { if (this->queueNameMap[q_handle] == qname) return q_handle; } return nullptr; } bool Worker::join(TickType_t timeout) { assert(xTaskGetCurrentTaskHandle() == runnerTask); assert(getState() == State::Running); if (xSemaphoreTake(joinSemaphore, timeout) != pdTRUE) { return false; } while (eTaskGetState(taskHandle) != eDeleted) {} return true; } void Worker::setState(State newState) { xSemaphoreTake(stateMutex, portMAX_DELAY); state = newState; xSemaphoreGive(stateMutex); } Worker::State Worker::getState() const { State currentState; xSemaphoreTake(stateMutex, portMAX_DELAY); currentState = state; xSemaphoreGive(stateMutex); return currentState; } void Worker::close() { if (!stop() || !join()) { kill(); } deinit(); } void Worker::kill() { // do not check state - this is intentional, we want to be able to kill // a worker in case of unexpected failure without knowing its state. vTaskDelete(taskHandle); } } /* namespace sys */