mirror of
https://github.com/dragonpilot/dragonpilot.git
synced 2026-02-18 23:23:53 +08:00
286b7e58b capitalize docker command 0adfc7e77 add canErrorPersistent event c8be73d10 use github url instead of gitlab mirror 8e2d212a3 add pre-commit static analysis (#48) f27222f8f add gpsOK flag to liveLocationKalman 4bb1eb826 submaster always conflates 30838d40a C++ implementation of SubMaster and PubMaster (#42) c1a6d75d1 Fix potential segfault in MSGQPubSocket::connect (#45) 67fae6afc Use ZMQ on MacOS (#46) 01cdf832c add default values for backwards compat c96381b0d add OK flags to locationd output d589d5e3d add white panda deprecation events 856c9812d mark unused car events as deprecated 4f68db8f6 remove unnecessary new event type 9073b9b1b Library cleanup (#43) 7a786d9ce move remaining alerts to car events d6f10a4b9 add alert event type git-subtree-dir: cereal git-subtree-split: 286b7e58b5e7ede697370acc77fc7cca21d69764
163 lines
4.7 KiB
C++
163 lines
4.7 KiB
C++
#include <assert.h>
|
|
#include <time.h>
|
|
#include "messaging.hpp"
|
|
#include "services.h"
|
|
#ifdef __APPLE__
|
|
#define CLOCK_BOOTTIME CLOCK_MONOTONIC
|
|
#endif
|
|
static inline uint64_t nanos_since_boot() {
|
|
struct timespec t;
|
|
clock_gettime(CLOCK_BOOTTIME, &t);
|
|
return t.tv_sec * 1000000000ULL + t.tv_nsec;
|
|
}
|
|
static const service *get_service(const char *name) {
|
|
for (const auto &it : services) {
|
|
if (strcmp(it.name, name) == 0) return ⁢
|
|
}
|
|
return nullptr;
|
|
}
|
|
static inline bool inList(const std::initializer_list<const char *> &list, const char *value) {
|
|
for (auto &v : list) {
|
|
if (strcmp(value, v) == 0) return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
class MessageContext {
|
|
public:
|
|
MessageContext() { ctx_ = Context::create(); }
|
|
~MessageContext() { delete ctx_; }
|
|
Context *ctx_;
|
|
};
|
|
MessageContext ctx;
|
|
|
|
struct SubMaster::SubMessage {
|
|
std::string name;
|
|
SubSocket *socket = nullptr;
|
|
int freq = 0;
|
|
bool updated = false, alive = false, valid = false, ignore_alive;
|
|
uint64_t rcv_time = 0, rcv_frame = 0;
|
|
void *allocated_msg_reader = nullptr;
|
|
capnp::FlatArrayMessageReader *msg_reader = nullptr;
|
|
kj::Array<capnp::word> buf;
|
|
cereal::Event::Reader event;
|
|
};
|
|
|
|
SubMaster::SubMaster(const std::initializer_list<const char *> &service_list, const char *address,
|
|
const std::initializer_list<const char *> &ignore_alive) {
|
|
poller_ = Poller::create();
|
|
for (auto name : service_list) {
|
|
const service *serv = get_service(name);
|
|
assert(serv != nullptr);
|
|
SubSocket *socket = SubSocket::create(ctx.ctx_, name, address ? address : "127.0.0.1", true);
|
|
assert(socket != 0);
|
|
poller_->registerSocket(socket);
|
|
SubMessage *m = new SubMessage{
|
|
.socket = socket,
|
|
.freq = serv->frequency,
|
|
.ignore_alive = inList(ignore_alive, name),
|
|
.allocated_msg_reader = malloc(sizeof(capnp::FlatArrayMessageReader)),
|
|
.buf = kj::heapArray<capnp::word>(1024)};
|
|
messages_[socket] = m;
|
|
services_[name] = m;
|
|
}
|
|
}
|
|
|
|
int SubMaster::update(int timeout) {
|
|
if (++frame_ == UINT64_MAX) frame_ = 1;
|
|
for (auto &kv : messages_) kv.second->updated = false;
|
|
|
|
int updated = 0;
|
|
auto sockets = poller_->poll(timeout);
|
|
uint64_t current_time = nanos_since_boot();
|
|
for (auto s : sockets) {
|
|
Message *msg = s->receive(true);
|
|
if (msg == nullptr) continue;
|
|
|
|
SubMessage *m = messages_.at(s);
|
|
const size_t size = (msg->getSize() / sizeof(capnp::word)) + 1;
|
|
if (m->buf.size() < size) {
|
|
m->buf = kj::heapArray<capnp::word>(size);
|
|
}
|
|
memcpy(m->buf.begin(), msg->getData(), msg->getSize());
|
|
delete msg;
|
|
|
|
if (m->msg_reader) {
|
|
m->msg_reader->~FlatArrayMessageReader();
|
|
}
|
|
m->msg_reader = new (m->allocated_msg_reader) capnp::FlatArrayMessageReader(kj::ArrayPtr<capnp::word>(m->buf.begin(), size));
|
|
m->event = m->msg_reader->getRoot<cereal::Event>();
|
|
m->updated = true;
|
|
m->rcv_time = current_time;
|
|
m->rcv_frame = frame_;
|
|
m->valid = m->event.getValid();
|
|
|
|
++updated;
|
|
}
|
|
|
|
for (auto &kv : messages_) {
|
|
SubMessage *m = kv.second;
|
|
m->alive = (m->freq <= (1e-5) || ((current_time - m->rcv_time) * (1e-9)) < (10.0 / m->freq));
|
|
}
|
|
return updated;
|
|
}
|
|
|
|
bool SubMaster::all_(const std::initializer_list<const char *> &service_list, bool valid, bool alive) {
|
|
int found = 0;
|
|
for (auto &kv : messages_) {
|
|
SubMessage *m = kv.second;
|
|
if (service_list.size() == 0 || inList(service_list, m->name.c_str())) {
|
|
found += (!valid || m->valid) && (!alive || (m->alive && !m->ignore_alive));
|
|
}
|
|
}
|
|
return service_list.size() == 0 ? found == messages_.size() : found == service_list.size();
|
|
}
|
|
|
|
void SubMaster::drain() {
|
|
while (true) {
|
|
auto polls = poller_->poll(0);
|
|
if (polls.size() == 0)
|
|
break;
|
|
|
|
for (auto sock : polls) {
|
|
Message *msg = sock->receive(true);
|
|
delete msg;
|
|
}
|
|
}
|
|
}
|
|
|
|
bool SubMaster::updated(const char *name) const { return services_.at(name)->updated; }
|
|
cereal::Event::Reader &SubMaster::operator[](const char *name) { return services_.at(name)->event; };
|
|
|
|
SubMaster::~SubMaster() {
|
|
delete poller_;
|
|
for (auto &kv : messages_) {
|
|
SubMessage *m = kv.second;
|
|
if (m->msg_reader) {
|
|
m->msg_reader->~FlatArrayMessageReader();
|
|
}
|
|
free(m->allocated_msg_reader);
|
|
delete m->socket;
|
|
delete m;
|
|
}
|
|
}
|
|
|
|
PubMaster::PubMaster(const std::initializer_list<const char *> &service_list) {
|
|
for (auto name : service_list) {
|
|
assert(get_service(name) != nullptr);
|
|
PubSocket *socket = PubSocket::create(ctx.ctx_, name);
|
|
assert(socket);
|
|
sockets_[name] = socket;
|
|
}
|
|
}
|
|
|
|
int PubMaster::send(const char *name, capnp::MessageBuilder &msg) {
|
|
auto words = capnp::messageToFlatArray(msg);
|
|
auto bytes = words.asBytes();
|
|
return send(name, bytes.begin(), bytes.size());
|
|
}
|
|
|
|
PubMaster::~PubMaster() {
|
|
for (auto s : sockets_) delete s.second;
|
|
}
|