cabana: use a monotonic buffer to allocate CanEvent (#29652)
* use a monotonic buffer to allocate CanEvent * set the next buffer size to 6MB * static * cleanup * use const iter * rename to insert_pos old-commit-hash: 57ad4f02f8e693639262aa3a982be8ed6c433ee0
This commit is contained in:
@@ -5,6 +5,8 @@
|
||||
|
||||
#include <QTimer>
|
||||
|
||||
static const int EVENT_NEXT_BUFFER_SIZE = 6 * 1024 * 1024; // 6MB
|
||||
|
||||
AbstractStream *can = nullptr;
|
||||
|
||||
StreamNotifier *StreamNotifier::instance() {
|
||||
@@ -12,8 +14,11 @@ StreamNotifier *StreamNotifier::instance() {
|
||||
return ¬ifier;
|
||||
}
|
||||
|
||||
AbstractStream::AbstractStream(QObject *parent) : new_msgs(new QHash<MessageId, CanData>()), QObject(parent) {
|
||||
AbstractStream::AbstractStream(QObject *parent) : QObject(parent) {
|
||||
assert(parent != nullptr);
|
||||
new_msgs = std::make_unique<QHash<MessageId, CanData>>();
|
||||
event_buffer = std::make_unique<MonotonicBuffer>(EVENT_NEXT_BUFFER_SIZE);
|
||||
|
||||
QObject::connect(this, &AbstractStream::seekedTo, this, &AbstractStream::updateLastMsgsTo);
|
||||
QObject::connect(&settings, &Settings::changed, this, &AbstractStream::updateMasks);
|
||||
QObject::connect(dbc(), &DBCManager::DBCFileChanged, this, &AbstractStream::updateMasks);
|
||||
@@ -129,37 +134,25 @@ void AbstractStream::updateLastMsgsTo(double sec) {
|
||||
}
|
||||
|
||||
void AbstractStream::mergeEvents(std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last) {
|
||||
size_t memory_size = 0;
|
||||
size_t events_cnt = 0;
|
||||
for (auto it = first; it != last; ++it) {
|
||||
if ((*it)->which == cereal::Event::Which::CAN) {
|
||||
for (const auto &c : (*it)->event.getCan()) {
|
||||
memory_size += sizeof(CanEvent) + sizeof(uint8_t) * c.getDat().size();
|
||||
++events_cnt;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (memory_size == 0) return;
|
||||
static std::unordered_map<MessageId, std::deque<const CanEvent *>> new_events_map;
|
||||
static std::vector<const CanEvent *> new_events;
|
||||
new_events_map.clear();
|
||||
new_events.clear();
|
||||
|
||||
char *ptr = memory_blocks.emplace_back(new char[memory_size]).get();
|
||||
std::unordered_map<MessageId, std::deque<const CanEvent *>> new_events_map;
|
||||
std::vector<const CanEvent *> new_events;
|
||||
new_events.reserve(events_cnt);
|
||||
for (auto it = first; it != last; ++it) {
|
||||
if ((*it)->which == cereal::Event::Which::CAN) {
|
||||
uint64_t ts = (*it)->mono_time;
|
||||
for (const auto &c : (*it)->event.getCan()) {
|
||||
CanEvent *e = (CanEvent *)ptr;
|
||||
auto dat = c.getDat();
|
||||
CanEvent *e = (CanEvent *)event_buffer->allocate(sizeof(CanEvent) + sizeof(uint8_t) * dat.size());
|
||||
e->src = c.getSrc();
|
||||
e->address = c.getAddress();
|
||||
e->mono_time = ts;
|
||||
auto dat = c.getDat();
|
||||
e->size = dat.size();
|
||||
memcpy(e->dat, (uint8_t *)dat.begin(), e->size);
|
||||
|
||||
new_events_map[{.source = e->src, .address = e->address}].push_back(e);
|
||||
new_events.push_back(e);
|
||||
ptr += sizeof(CanEvent) + sizeof(uint8_t) * e->size;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -168,17 +161,14 @@ void AbstractStream::mergeEvents(std::vector<Event *>::const_iterator first, std
|
||||
return l->mono_time < r->mono_time;
|
||||
};
|
||||
|
||||
bool append = new_events.front()->mono_time > lastest_event_ts;
|
||||
for (auto &[id, new_e] : new_events_map) {
|
||||
auto &e = events_[id];
|
||||
auto pos = append ? e.end()
|
||||
: std::upper_bound(e.cbegin(), e.cend(), new_e.front(), compare);
|
||||
e.insert(pos, new_e.cbegin(), new_e.cend());
|
||||
auto insert_pos = std::upper_bound(e.cbegin(), e.cend(), new_e.front(), compare);
|
||||
e.insert(insert_pos, new_e.cbegin(), new_e.cend());
|
||||
}
|
||||
|
||||
auto pos = append ? all_events_.end()
|
||||
: std::upper_bound(all_events_.begin(), all_events_.end(), new_events.front(), compare);
|
||||
all_events_.insert(pos, new_events.cbegin(), new_events.cend());
|
||||
auto insert_pos = std::upper_bound(all_events_.cbegin(), all_events_.cend(), new_events.front(), compare);
|
||||
all_events_.insert(insert_pos, new_events.cbegin(), new_events.cend());
|
||||
|
||||
lastest_event_ts = all_events_.back()->mono_time;
|
||||
emit eventsMerged();
|
||||
|
||||
@@ -101,7 +101,7 @@ protected:
|
||||
QHash<MessageId, CanData> all_msgs;
|
||||
std::unordered_map<MessageId, std::vector<const CanEvent *>> events_;
|
||||
std::vector<const CanEvent *> all_events_;
|
||||
std::deque<std::unique_ptr<char[]>> memory_blocks;
|
||||
std::unique_ptr<MonotonicBuffer> event_buffer;
|
||||
std::mutex mutex;
|
||||
std::unordered_map<MessageId, std::vector<uint8_t>> masks;
|
||||
};
|
||||
|
||||
@@ -4,12 +4,13 @@
|
||||
#include <array>
|
||||
#include <csignal>
|
||||
#include <limits>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <sys/socket.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include <QDebug>
|
||||
#include <QColor>
|
||||
#include <QDebug>
|
||||
#include <QFontDatabase>
|
||||
#include <QLocale>
|
||||
#include <QPainter>
|
||||
@@ -264,3 +265,26 @@ QString signalToolTip(const cabana::Signal *sig) {
|
||||
)").arg(sig->name).arg(sig->start_bit).arg(sig->size).arg(sig->msb).arg(sig->lsb)
|
||||
.arg(sig->is_little_endian ? "Y" : "N").arg(sig->is_signed ? "Y" : "N");
|
||||
}
|
||||
|
||||
// MonotonicBuffer
|
||||
|
||||
void *MonotonicBuffer::allocate(size_t bytes, size_t alignment) {
|
||||
assert(bytes > 0);
|
||||
void *p = std::align(alignment, bytes, current_buf, available);
|
||||
if (p == nullptr) {
|
||||
available = next_buffer_size = std::max(next_buffer_size, bytes);
|
||||
current_buf = buffers.emplace_back(std::aligned_alloc(alignment, next_buffer_size));
|
||||
next_buffer_size *= growth_factor;
|
||||
p = current_buf;
|
||||
}
|
||||
|
||||
current_buf = (char *)current_buf + bytes;
|
||||
available -= bytes;
|
||||
return p;
|
||||
}
|
||||
|
||||
MonotonicBuffer::~MonotonicBuffer() {
|
||||
for (auto buf : buffers) {
|
||||
free(buf);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <cmath>
|
||||
#include <deque>
|
||||
#include <vector>
|
||||
#include <utility>
|
||||
|
||||
@@ -153,5 +154,20 @@ private:
|
||||
QSocketNotifier *sn;
|
||||
};
|
||||
|
||||
class MonotonicBuffer {
|
||||
public:
|
||||
MonotonicBuffer(size_t initial_size) : next_buffer_size(initial_size) {}
|
||||
~MonotonicBuffer();
|
||||
void *allocate(size_t bytes, size_t alignment = 16ul);
|
||||
void deallocate(void *p) {}
|
||||
|
||||
private:
|
||||
void *current_buf = nullptr;
|
||||
size_t next_buffer_size = 0;
|
||||
size_t available = 0;
|
||||
std::deque<void *> buffers;
|
||||
static constexpr float growth_factor = 1.5;
|
||||
};
|
||||
|
||||
int num_decimals(double num);
|
||||
QString signalToolTip(const cabana::Signal *sig);
|
||||
|
||||
Reference in New Issue
Block a user