mirror of
https://github.com/sunnypilot/sunnypilot.git
synced 2026-02-19 00:43:54 +08:00
cabana: syc the last messages after seeking (#27149)
* fix wrong last message time * update last messages after seekto * cleanup * remove src,address from CanData * merge master merge master
This commit is contained in:
@@ -79,7 +79,7 @@ void HistoryLogModel::setFilter(int sig_idx, const QString &value, std::function
|
||||
|
||||
void HistoryLogModel::updateState() {
|
||||
if (!msg_id.isEmpty()) {
|
||||
uint64_t current_time = (can->currentSec() + can->routeStartTime()) * 1e9;
|
||||
uint64_t current_time = (can->lastMessage(msg_id).ts + can->routeStartTime()) * 1e9 + 1;
|
||||
auto new_msgs = dynamic_mode ? fetchData(current_time, last_fetch_time) : fetchData(0);
|
||||
if ((has_more_data = !new_msgs.empty())) {
|
||||
beginInsertRows({}, 0, new_msgs.size() - 1);
|
||||
@@ -109,7 +109,7 @@ std::deque<HistoryLogModel::Message> HistoryLogModel::fetchData(InputIt first, I
|
||||
for (auto it = first; it != last && (*it)->mono_time > min_time; ++it) {
|
||||
if ((*it)->which == cereal::Event::Which::CAN) {
|
||||
for (const auto &c : (*it)->event.getCan()) {
|
||||
if (src == c.getSrc() && address == c.getAddress()) {
|
||||
if (address == c.getAddress() && src == c.getSrc()) {
|
||||
const auto dat = c.getDat();
|
||||
for (int i = 0; i < sigs.size(); ++i) {
|
||||
values[i] = get_raw_value((uint8_t *)dat.begin(), dat.size(), *(sigs[i]));
|
||||
@@ -140,7 +140,7 @@ std::deque<HistoryLogModel::Message> HistoryLogModel::fetchData(uint64_t from_ti
|
||||
if (dynamic_mode) {
|
||||
auto first = std::upper_bound(events->rbegin(), events->rend(), from_time, [=](uint64_t ts, auto &e) { return e->mono_time < ts; });
|
||||
auto msgs = fetchData(first, events->rend(), min_time);
|
||||
if (update_colors && min_time > 0) {
|
||||
if (update_colors && (min_time > 0 || messages.empty())) {
|
||||
for (auto it = msgs.rbegin(); it != msgs.rend(); ++it) {
|
||||
hex_colors.compute(it->data, it->mono_time / (double)1e9, freq);
|
||||
it->colors = hex_colors.colors;
|
||||
|
||||
@@ -106,11 +106,12 @@ QVariant MessageListModel::data(const QModelIndex &index, int role) const {
|
||||
}
|
||||
} else if (role == Qt::UserRole && index.column() == 4) {
|
||||
QList<QVariant> colors;
|
||||
colors.reserve(can_data.dat.size());
|
||||
for (int i = 0; i < can_data.dat.size(); i++){
|
||||
if (suppressed_bytes.contains({id, i})) {
|
||||
colors.append(QColor(255, 255, 255, 0));
|
||||
} else {
|
||||
colors.append(can_data.colors[i]);
|
||||
colors.append(i < can_data.colors.size() ? can_data.colors[i] : QColor(255, 255, 255, 0));
|
||||
}
|
||||
}
|
||||
return colors;
|
||||
@@ -152,8 +153,8 @@ void MessageListModel::sortMessages() {
|
||||
});
|
||||
} else if (sort_column == 1) {
|
||||
std::sort(msgs.begin(), msgs.end(), [this](auto &l, auto &r) {
|
||||
auto ll = std::tuple{can->lastMessage(l).src, can->lastMessage(l).address, l};
|
||||
auto rr = std::tuple{can->lastMessage(r).src, can->lastMessage(r).address, r};
|
||||
auto ll = DBCManager::parseId(l);
|
||||
auto rr = DBCManager::parseId(r);
|
||||
return sort_order == Qt::AscendingOrder ? ll < rr : ll > rr;
|
||||
});
|
||||
} else if (sort_column == 2) {
|
||||
|
||||
@@ -4,7 +4,9 @@ AbstractStream *can = nullptr;
|
||||
|
||||
AbstractStream::AbstractStream(QObject *parent, bool is_live_streaming) : is_live_streaming(is_live_streaming), QObject(parent) {
|
||||
can = this;
|
||||
new_msgs = std::make_unique<QHash<QString, CanData>>();
|
||||
QObject::connect(this, &AbstractStream::received, this, &AbstractStream::process, Qt::QueuedConnection);
|
||||
QObject::connect(this, &AbstractStream::seekedTo, this, &AbstractStream::updateLastMsgsTo);
|
||||
}
|
||||
|
||||
void AbstractStream::process(QHash<QString, CanData> *messages) {
|
||||
@@ -18,30 +20,17 @@ void AbstractStream::process(QHash<QString, CanData> *messages) {
|
||||
}
|
||||
|
||||
bool AbstractStream::updateEvent(const Event *event) {
|
||||
static std::unique_ptr new_msgs = std::make_unique<QHash<QString, CanData>>();
|
||||
static QHash<QString, ChangeTracker> change_trackers;
|
||||
static double prev_update_ts = 0;
|
||||
|
||||
if (event->which == cereal::Event::Which::CAN) {
|
||||
double current_sec = currentSec();
|
||||
if (counters_begin_sec == 0 || counters_begin_sec >= current_sec) {
|
||||
new_msgs->clear();
|
||||
counters.clear();
|
||||
counters_begin_sec = current_sec;
|
||||
}
|
||||
|
||||
auto can_events = event->event.getCan();
|
||||
for (const auto &c : can_events) {
|
||||
double current_sec = event->mono_time / 1e9 - routeStartTime();
|
||||
for (const auto &c : event->event.getCan()) {
|
||||
QString id = QString("%1:%2").arg(c.getSrc()).arg(c.getAddress(), 1, 16);
|
||||
CanData &data = (*new_msgs)[id];
|
||||
data.ts = current_sec;
|
||||
data.src = c.getSrc();
|
||||
data.address = c.getAddress();
|
||||
data.dat = QByteArray((char *)c.getDat().begin(), c.getDat().size());
|
||||
data.count = ++counters[id];
|
||||
if (double delta = (current_sec - counters_begin_sec); delta > 0) {
|
||||
data.freq = data.count / delta;
|
||||
}
|
||||
data.freq = data.count / std::max(1.0, current_sec);
|
||||
change_trackers[id].compute(data.dat, data.ts, data.freq);
|
||||
data.colors = change_trackers[id].colors;
|
||||
data.last_change_t = change_trackers[id].last_change_t;
|
||||
@@ -60,3 +49,46 @@ bool AbstractStream::updateEvent(const Event *event) {
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
const CanData &AbstractStream::lastMessage(const QString &id) {
|
||||
static CanData empty_data;
|
||||
auto it = can_msgs.find(id);
|
||||
return it != can_msgs.end() ? it.value() : empty_data;
|
||||
}
|
||||
|
||||
void AbstractStream::updateLastMsgsTo(double sec) {
|
||||
QHash<std::pair<uint8_t, uint32_t>, CanData> last_msgs; // Much faster than QHash<String, CanData>
|
||||
last_msgs.reserve(can_msgs.size());
|
||||
double route_start_time = routeStartTime();
|
||||
uint64_t last_ts = (sec + route_start_time) * 1e9;
|
||||
auto last = std::upper_bound(events()->rbegin(), events()->rend(), last_ts, [](uint64_t ts, auto &e) { return e->mono_time < ts; });
|
||||
for (auto it = last; it != events()->rend(); ++it) {
|
||||
if ((*it)->which == cereal::Event::Which::CAN) {
|
||||
for (const auto &c : (*it)->event.getCan()) {
|
||||
auto &m = last_msgs[{c.getSrc(), c.getAddress()}];
|
||||
if (++m.count == 1) {
|
||||
m.ts = ((*it)->mono_time / 1e9) - route_start_time;
|
||||
m.dat = QByteArray((char *)c.getDat().begin(), c.getDat().size());
|
||||
m.colors = QVector<QColor>(m.dat.size(), QColor(0, 0, 0, 0));
|
||||
m.last_change_t = QVector<double>(m.dat.size(), m.ts);
|
||||
} else {
|
||||
m.freq = m.count / std::max(1.0, m.ts);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// it is thread safe to update data here.
|
||||
// updateEvent will not be called before replayStream::seekedTo return.
|
||||
new_msgs->clear();
|
||||
change_trackers.clear();
|
||||
counters.clear();
|
||||
can_msgs.clear();
|
||||
for (auto it = last_msgs.cbegin(); it != last_msgs.cend(); ++it) {
|
||||
QString msg_id = QString("%1:%2").arg(it.key().first).arg(it.key().second, 1, 16);
|
||||
can_msgs[msg_id] = it.value();
|
||||
counters[msg_id] = it.value().count;
|
||||
}
|
||||
emit updated();
|
||||
emit msgsReceived(&can_msgs);
|
||||
}
|
||||
|
||||
@@ -11,8 +11,6 @@
|
||||
|
||||
struct CanData {
|
||||
double ts = 0.;
|
||||
uint8_t src = 0;
|
||||
uint32_t address = 0;
|
||||
uint32_t count = 0;
|
||||
uint32_t freq = 0;
|
||||
QByteArray dat;
|
||||
@@ -34,7 +32,7 @@ public:
|
||||
virtual double routeStartTime() const { return 0; }
|
||||
virtual double currentSec() const = 0;
|
||||
virtual QDateTime currentDateTime() const { return {}; }
|
||||
virtual const CanData &lastMessage(const QString &id) { return can_msgs[id]; }
|
||||
virtual const CanData &lastMessage(const QString &id);
|
||||
virtual VisionStreamType visionStreamType() const { return VISION_STREAM_ROAD; }
|
||||
virtual const Route *route() const { return nullptr; }
|
||||
virtual const std::vector<Event *> *events() const = 0;
|
||||
@@ -54,16 +52,18 @@ signals:
|
||||
void received(QHash<QString, CanData> *);
|
||||
|
||||
public:
|
||||
QMap<QString, CanData> can_msgs;
|
||||
QHash<QString, CanData> can_msgs;
|
||||
|
||||
protected:
|
||||
void process(QHash<QString, CanData> *);
|
||||
bool updateEvent(const Event *event);
|
||||
void updateLastMsgsTo(double sec);
|
||||
|
||||
bool is_live_streaming = false;
|
||||
std::atomic<double> counters_begin_sec = 0;
|
||||
std::atomic<bool> processing = false;
|
||||
QHash<QString, uint32_t> counters;
|
||||
std::unique_ptr<QHash<QString, CanData>> new_msgs;
|
||||
QHash<QString, ChangeTracker> change_trackers;
|
||||
};
|
||||
|
||||
// A global pointer referring to the unique AbstractStream object
|
||||
|
||||
@@ -42,12 +42,6 @@ bool ReplayStream::eventFilter(const Event *event) {
|
||||
return true;
|
||||
}
|
||||
|
||||
void ReplayStream::seekTo(double ts) {
|
||||
replay->seekTo(std::max(double(0), ts), false);
|
||||
counters_begin_sec = 0;
|
||||
emit updated();
|
||||
}
|
||||
|
||||
void ReplayStream::pause(bool pause) {
|
||||
replay->pause(pause);
|
||||
emit(pause ? paused() : resume());
|
||||
|
||||
@@ -12,7 +12,7 @@ public:
|
||||
~ReplayStream();
|
||||
bool loadRoute(const QString &route, const QString &data_dir, uint32_t replay_flags = REPLAY_FLAG_NONE);
|
||||
bool eventFilter(const Event *event);
|
||||
void seekTo(double ts) override;
|
||||
void seekTo(double ts) override { replay->seekTo(std::max(double(0), ts), false); };
|
||||
inline QString routeName() const override { return replay->route()->name(); }
|
||||
inline QString carFingerprint() const override { return replay->carFingerprint().c_str(); }
|
||||
inline VisionStreamType visionStreamType() const override { return replay->hasFlag(REPLAY_FLAG_ECAM) ? VISION_STREAM_WIDE_ROAD : VISION_STREAM_ROAD; }
|
||||
|
||||
@@ -114,9 +114,9 @@ void Replay::seekTo(double seconds, bool relative) {
|
||||
rInfo("seeking to %d s, segment %d", (int)seconds, seg);
|
||||
current_segment_ = seg;
|
||||
cur_mono_time_ = route_start_ts_ + seconds * 1e9;
|
||||
emit seekedTo(seconds);
|
||||
return isSegmentMerged(seg);
|
||||
});
|
||||
emit seekedTo(seconds);
|
||||
queueSegment();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user