cabana: fixed the multi-threading issues of AbstractStream (#28590)

* fix multi-threading issues

* protect masks with mutex
old-commit-hash: e80440dc5d
This commit is contained in:
Dean Lee 2023-06-20 01:21:16 +08:00 committed by GitHub
parent fbce1c9863
commit b31de7db56
8 changed files with 47 additions and 14 deletions

View File

@ -76,7 +76,7 @@ QString cabana::Msg::newSignalName() {
}
void cabana::Msg::update() {
mask = QVector<uint8_t>(size, 0x00).toList();
mask.assign(size, 0x00);
multiplexor = nullptr;
// sort signals

View File

@ -102,7 +102,7 @@ public:
QString comment;
std::vector<cabana::Signal *> sigs;
QList<uint8_t> mask;
std::vector<uint8_t> mask;
cabana::Signal *multiplexor = nullptr;
};

View File

@ -58,6 +58,7 @@ void DBCManager::addSignal(const MessageId &id, const cabana::Signal &sig) {
if (auto m = msg(id)) {
if (auto s = m->addSignal(sig)) {
emit signalAdded(id, s);
emit maskUpdated();
}
}
}
@ -66,6 +67,7 @@ void DBCManager::updateSignal(const MessageId &id, const QString &sig_name, cons
if (auto m = msg(id)) {
if (auto s = m->updateSignal(sig_name, sig)) {
emit signalUpdated(s);
emit maskUpdated();
}
}
}
@ -75,6 +77,7 @@ void DBCManager::removeSignal(const MessageId &id, const QString &sig_name) {
if (auto s = m->sig(sig_name)) {
emit signalRemoved(s);
m->removeSignal(sig_name);
emit maskUpdated();
}
}
}
@ -91,6 +94,7 @@ void DBCManager::removeMsg(const MessageId &id) {
assert(dbc_file); // This should be impossible
dbc_file->removeMsg(id);
emit msgRemoved(id);
emit maskUpdated();
}
QString DBCManager::newMsgName(const MessageId &id) {
@ -102,8 +106,8 @@ QString DBCManager::newSignalName(const MessageId &id) {
return m ? m->newSignalName() : "";
}
const QList<uint8_t> &DBCManager::mask(const MessageId &id) {
static QList<uint8_t> empty_mask;
const std::vector<uint8_t> &DBCManager::mask(const MessageId &id) {
static std::vector<uint8_t> empty_mask;
auto m = msg(id);
return m ? m->mask : empty_mask;
}

View File

@ -32,7 +32,7 @@ public:
QString newMsgName(const MessageId &id);
QString newSignalName(const MessageId &id);
const QList<uint8_t>& mask(const MessageId &id);
const std::vector<uint8_t>& mask(const MessageId &id);
const std::map<uint32_t, cabana::Msg> &getMessages(uint8_t source);
cabana::Msg *msg(const MessageId &id);
@ -57,6 +57,7 @@ signals:
void msgUpdated(MessageId id);
void msgRemoved(MessageId id);
void DBCFileChanged();
void maskUpdated();
private:
std::map<int, std::shared_ptr<DBCFile>> dbc_files;

View File

@ -141,7 +141,6 @@ std::deque<HistoryLogModel::Message> HistoryLogModel::fetchData(InputIt first, I
}
std::deque<HistoryLogModel::Message> HistoryLogModel::fetchData(uint64_t from_time, uint64_t min_time) {
const QList<uint8_t> mask;
const auto &events = can->events(msg_id);
const auto freq = can->lastMessage(msg_id).freq;
const bool update_colors = !display_signals_mode || sigs.empty();
@ -154,7 +153,7 @@ std::deque<HistoryLogModel::Message> HistoryLogModel::fetchData(uint64_t from_ti
auto msgs = fetchData(first, events.rend(), min_time);
if (update_colors && (min_time > 0 || messages.empty())) {
for (auto it = msgs.rbegin(); it != msgs.rend(); ++it) {
hex_colors.compute(it->data.data(), it->data.size(), it->mono_time / (double)1e9, speed, mask, freq);
hex_colors.compute(it->data.data(), it->data.size(), it->mono_time / (double)1e9, speed, nullptr, freq);
it->colors = hex_colors.colors;
}
}
@ -167,7 +166,7 @@ std::deque<HistoryLogModel::Message> HistoryLogModel::fetchData(uint64_t from_ti
auto msgs = fetchData(first, events.cend(), 0);
if (update_colors) {
for (auto it = msgs.begin(); it != msgs.end(); ++it) {
hex_colors.compute(it->data.data(), it->data.size(), it->mono_time / (double)1e9, speed, mask, freq);
hex_colors.compute(it->data.data(), it->data.size(), it->mono_time / (double)1e9, speed, nullptr, freq);
it->colors = hex_colors.colors;
}
}

View File

@ -83,6 +83,7 @@ MessagesWidget::MessagesWidget(QWidget *parent) : QWidget(parent) {
});
QObject::connect(suppress_defined_signals, &QCheckBox::stateChanged, [=](int state) {
settings.suppress_defined_signals = (state == Qt::Checked);
emit settings.changed();
});
QObject::connect(can, &AbstractStream::msgsReceived, model, &MessageListModel::msgsReceived);
QObject::connect(dbc(), &DBCManager::DBCFileChanged, this, &MessagesWidget::dbcModified);

View File

@ -12,6 +12,9 @@ StreamNotifier *StreamNotifier::instance() {
AbstractStream::AbstractStream(QObject *parent) : new_msgs(new QHash<MessageId, CanData>()), QObject(parent) {
assert(parent != nullptr);
QObject::connect(this, &AbstractStream::seekedTo, this, &AbstractStream::updateLastMsgsTo);
QObject::connect(&settings, &Settings::changed, this, &AbstractStream::updateMasks);
QObject::connect(dbc(), &DBCManager::DBCFileChanged, this, &AbstractStream::updateMasks);
QObject::connect(dbc(), &DBCManager::maskUpdated, this, &AbstractStream::updateMasks);
QObject::connect(this, &AbstractStream::streamStarted, [this]() {
emit StreamNotifier::instance()->changingStream();
delete can;
@ -20,6 +23,20 @@ AbstractStream::AbstractStream(QObject *parent) : new_msgs(new QHash<MessageId,
});
}
void AbstractStream::updateMasks() {
std::lock_guard lk(mutex);
masks.clear();
if (settings.suppress_defined_signals) {
for (auto s : sources) {
if (auto f = dbc()->findDBCFile(s)) {
for (const auto &[address, m] : f->getMessages()) {
masks[{.source = (uint8_t)s, .address = address}] = m.mask;
}
}
}
}
}
void AbstractStream::updateMessages(QHash<MessageId, CanData> *messages) {
auto prev_src_size = sources.size();
auto prev_msg_size = last_msgs.size();
@ -29,6 +46,7 @@ void AbstractStream::updateMessages(QHash<MessageId, CanData> *messages) {
sources.insert(id.source);
}
if (sources.size() != prev_src_size) {
updateMasks();
emit sourcesUpdated(sources);
}
emit updated();
@ -38,7 +56,9 @@ void AbstractStream::updateMessages(QHash<MessageId, CanData> *messages) {
}
void AbstractStream::updateEvent(const MessageId &id, double sec, const uint8_t *data, uint8_t size) {
QList<uint8_t> mask = settings.suppress_defined_signals ? dbc()->mask(id) : QList<uint8_t>();
std::lock_guard lk(mutex);
auto mask_it = masks.find(id);
std::vector<uint8_t> *mask = mask_it == masks.end() ? nullptr : &mask_it->second;
all_msgs[id].compute((const char *)data, size, sec, getSpeed(), mask);
if (!new_msgs->contains(id)) {
new_msgs->insert(id, {});
@ -47,7 +67,8 @@ void AbstractStream::updateEvent(const MessageId &id, double sec, const uint8_t
bool AbstractStream::postEvents() {
// delay posting CAN message if UI thread is busy
if (processing.exchange(true) == false) {
if (processing == false) {
processing = true;
for (auto it = new_msgs->begin(); it != new_msgs->end(); ++it) {
it.value() = all_msgs[it.key()];
}
@ -84,7 +105,8 @@ void AbstractStream::updateLastMsgsTo(double sec) {
auto it = std::lower_bound(ev.crbegin(), ev.crend(), last_ts, [](auto e, uint64_t ts) {
return e->mono_time > ts;
});
QList<uint8_t> mask = settings.suppress_defined_signals ? dbc()->mask(id) : QList<uint8_t>();
auto mask_it = masks.find(id);
std::vector<uint8_t> *mask = mask_it == masks.end() ? nullptr : &mask_it->second;
if (it != ev.crend()) {
double ts = (*it)->mono_time / 1e9 - routeStartTime();
auto &m = all_msgs[id];
@ -93,7 +115,10 @@ void AbstractStream::updateLastMsgsTo(double sec) {
m.freq = m.count / std::max(1.0, ts);
}
}
// deep copy all_msgs to last_msgs to avoid multi-threading issue.
last_msgs = all_msgs;
last_msgs.detach();
// use a timer to prevent recursive calls
QTimer::singleShot(0, [this]() {
emit updated();
@ -171,7 +196,7 @@ static inline QColor blend(const QColor &a, const QColor &b) {
return QColor((a.red() + b.red()) / 2, (a.green() + b.green()) / 2, (a.blue() + b.blue()) / 2, (a.alpha() + b.alpha()) / 2);
}
void CanData::compute(const char *can_data, const int size, double current_sec, double playback_speed, const QList<uint8_t> &mask, uint32_t in_freq) {
void CanData::compute(const char *can_data, const int size, double current_sec, double playback_speed, const std::vector<uint8_t> *mask, uint32_t in_freq) {
ts = current_sec;
++count;
const double sec_to_first_event = current_sec - (can->allEvents().front()->mono_time / 1e9 - can->routeStartTime());
@ -190,7 +215,7 @@ void CanData::compute(const char *can_data, const int size, double current_sec,
const QColor &greyish_blue = !lighter ? GREYISH_BLUE : GREYISH_BLUE_LIGHTER;
for (int i = 0; i < size; ++i) {
const uint8_t mask_byte = (i < mask.size()) ? (~mask[i]) : 0xff;
const uint8_t mask_byte = (mask && i < mask->size()) ? (~((*mask)[i])) : 0xff;
const uint8_t last = dat[i] & mask_byte;
const uint8_t cur = can_data[i] & mask_byte;
const int delta = cur - last;

View File

@ -13,7 +13,7 @@
#include "tools/replay/replay.h"
struct CanData {
void compute(const char *dat, const int size, double current_sec, double playback_speed, const QList<uint8_t> &mask, uint32_t in_freq = 0);
void compute(const char *dat, const int size, double current_sec, double playback_speed, const std::vector<uint8_t> *mask, uint32_t in_freq = 0);
double ts = 0.;
uint32_t count = 0;
@ -79,6 +79,7 @@ protected:
uint64_t lastEventMonoTime() const { return lastest_event_ts; }
void updateEvent(const MessageId &id, double sec, const uint8_t *data, uint8_t size);
void updateMessages(QHash<MessageId, CanData> *);
void updateMasks();
void updateLastMsgsTo(double sec);
uint64_t lastest_event_ts = 0;
@ -88,6 +89,8 @@ protected:
std::unordered_map<MessageId, std::vector<const CanEvent *>> events_;
std::vector<const CanEvent *> all_events_;
std::deque<std::unique_ptr<char[]>> memory_blocks;
std::mutex mutex;
std::unordered_map<MessageId, std::vector<uint8_t>> masks;
};
class AbstractOpenStreamWidget : public QWidget {