Files
MuditaOS/module-sys/Service/Bus.cpp
Adam Dobrowolski 2cee195e09 EGD-3056 subscribtion based execution (on typeid) added and works fine
Till now there was no clear way to register hanlder for message in
Service, with this approach one can register function/lambda handler
for Message which encurages handling messages in functions rather than
in hudge switch cases.
With aproach like this our Messages resembe something between Command
and State pattern. In state pattern `Message` would be `Event` send,
whereas Service would be `Context`.
2020-04-15 16:50:19 +02:00

222 lines
7.0 KiB
C++

#include "Bus.hpp"
#include "thread.hpp"
#include "ticks.hpp"
#include "critical.hpp"
#include "Service.hpp"
#include "Message.hpp"
#include <algorithm>
#include <cassert>
namespace sys
{
using namespace cpp_freertos;
using namespace std;
Bus::Bus()
{}
Bus::~Bus()
{}
void Bus::Add(std::shared_ptr<Service> service)
{
CriticalSection::Enter();
for (auto &w : service->busChannels) {
channels[w].m_services.push_back(service);
}
servicesRegistered[service->GetName()] = service;
CriticalSection::Exit();
}
void Bus::Remove(std::shared_ptr<Service> service)
{
CriticalSection::Enter();
for (auto &w : service->busChannels) {
auto it = std::find_if(channels[w].m_services.begin(),
channels[w].m_services.end(),
[&](std::shared_ptr<Service> const &s) { return s == service; });
if (it != channels[w].m_services.end()) {
channels[w].m_services.erase(it);
}
else {
}
}
auto ele = servicesRegistered.find(service->GetName());
if (ele != servicesRegistered.end()) {
servicesRegistered.erase(service->GetName());
}
else {
// error!, trying to remove non-existing service
}
CriticalSection::Exit();
}
void Bus::SendResponse(std::shared_ptr<Message> msg, std::shared_ptr<Message> receivedMsg, Service *s)
{
assert(msg != nullptr);
assert(receivedMsg != nullptr);
assert(s != nullptr);
msg->sender = s->GetName();
msg->uniID = receivedMsg->uniID;
msg->transType = Message::TransmissionType ::Unicast;
if (servicesRegistered.find(receivedMsg->sender) != servicesRegistered.end()) {
servicesRegistered[receivedMsg->sender]->mailbox.push(msg);
}
else {
// silently drop it
}
}
bool Bus::SendUnicast(std::shared_ptr<Message> msg, const std::string &service, Service *s)
{
CriticalSection::Enter();
msg->id = uniqueMsgId++;
msg->uniID = unicastMsgId++;
CriticalSection::Exit();
msg->sender = s->GetName();
msg->transType = Message::TransmissionType ::Unicast;
auto ele = servicesRegistered.find(service);
if (ele != servicesRegistered.end()) {
servicesRegistered[service]->mailbox.push(msg);
}
else {
LOG_ERROR("Service %s doesn't exist", service.c_str());
return false;
}
return true;
}
MessageRet_t Bus::SendUnicast(std::shared_ptr<Message> msg,
const std::string &service,
Service *s,
uint32_t timeout)
{
std::vector<std::shared_ptr<Message>> tempMsg;
tempMsg.reserve(4); // reserve space for 4 elements to avoid costly memory allocations
CriticalSection::Enter();
uint64_t unicastID = unicastMsgId;
msg->id = uniqueMsgId++;
msg->uniID = unicastMsgId++;
CriticalSection::Exit();
msg->sender = s->GetName();
msg->transType = Message::TransmissionType ::Unicast;
auto ele = servicesRegistered.find(service);
if (ele != servicesRegistered.end()) {
servicesRegistered[service]->mailbox.push(msg);
}
else {
LOG_ERROR("Service %s doesn't exist", service.c_str());
return std::make_pair(ReturnCodes::ServiceDoesntExist, nullptr);
}
uint32_t currentTime = cpp_freertos::Ticks::GetTicks();
uint32_t timeoutNeeded = currentTime + timeout;
uint32_t timeElapsed = currentTime;
while (1) {
// timeout
if (timeElapsed >= timeoutNeeded) {
// Push messages collected during waiting for response to processing queue
for (const auto &w : tempMsg) {
s->mailbox.push(w);
}
// Register that we didn't receive response. Even if it arrives it will be dropped
s->staleUniqueMsg.push_back(std::make_pair(unicastID, cpp_freertos::Ticks::GetTicks()));
return std::make_pair(ReturnCodes::Timeout, nullptr);
}
// Immediately block on rx queue
auto rxmsg = s->mailbox.pop(timeoutNeeded - timeElapsed);
timeElapsed = cpp_freertos::Ticks::GetTicks();
// check for timeout
if (rxmsg == nullptr) {
// Push messages collected during waiting for response to processing queue
for (const auto &w : tempMsg) {
s->mailbox.push(w);
}
// Register that we didn't receive response. Even if it arrives it will be dropped
s->staleUniqueMsg.push_back(std::make_pair(unicastID, cpp_freertos::Ticks::GetTicks()));
return CreateMessageRet(ReturnCodes::Timeout, nullptr);
}
// Received response
if ((rxmsg->uniID == unicastID) && (msg->sender == s->GetName())) {
// Push messages collected during waiting for response to processing queue
for (const auto &w : tempMsg) {
s->mailbox.push(w);
}
return CreateMessageRet(ReturnCodes::Success, rxmsg);
}
// Check for system messages
else if (rxmsg->type == Message::Type::System) {
SystemMessage *sysmsg = static_cast<SystemMessage *>(rxmsg.get());
if (sysmsg->sysMsgType == SystemMessageType::Ping) {
rxmsg->Execute(s);
}
else {
tempMsg.push_back(rxmsg);
}
}
// Plain data message, save it for later
else {
tempMsg.push_back(rxmsg);
}
}
}
void Bus::SendMulticast(std::shared_ptr<Message> msg, BusChannels channel, Service *s)
{
CriticalSection::Enter();
msg->id = uniqueMsgId++;
msg->channel = channel;
CriticalSection::Exit();
msg->transType = Message::TransmissionType ::Multicast;
msg->sender = s->GetName();
for (auto const &w : channels[channel].m_services) {
w->mailbox.push(msg);
}
}
void Bus::SendBroadcast(std::shared_ptr<Message> msg, Service *s)
{
CriticalSection::Enter();
msg->id = uniqueMsgId++;
CriticalSection::Exit();
msg->transType = Message::TransmissionType ::Broadcaast;
msg->sender = s->GetName();
for (auto const &w : servicesRegistered) {
w.second->mailbox.push(msg);
}
}
std::map<BusChannels, Channel> Bus::channels;
std::map<std::string, std::shared_ptr<Service>> Bus::servicesRegistered;
uint64_t Bus::uniqueMsgId;
uint64_t Bus::unicastMsgId;
} // namespace sys