mirror of https://github.com/commaai/openpilot.git
LoggerState: test cases (#21855)
* test multiple threads writing to log
* fix typo
* use util::getenv
* check INIT_DATA and SENTINEL
* test END_OF_SEGMENT
* add comment
* test multiple threads logging and rotation
* cleanup
* update
* LoggerHandle:quick fix
* cleanup
* revert test_loggerd.py
* refactor test case
* check lock file
* check refcnt after close
* test_runner.cc
d
* int eixt_signal
old-commit-hash: 935cbd3139
This commit is contained in:
parent
467348f803
commit
0fac38e34f
|
@ -214,6 +214,7 @@ jobs:
|
|||
$UNIT_TEST selfdrive/thermald && \
|
||||
$UNIT_TEST tools/lib/tests && \
|
||||
./selfdrive/common/tests/test_util && \
|
||||
./selfdrive/loggerd/tests/test_logger &&\
|
||||
./selfdrive/proclogd/tests/test_proclog && \
|
||||
./selfdrive/camerad/test/ae_gray_test"
|
||||
- name: Upload coverage to Codecov
|
||||
|
|
|
@ -1 +1,2 @@
|
|||
loggerd
|
||||
tests/test_logger
|
||||
|
|
|
@ -26,3 +26,6 @@ if arch == "Darwin":
|
|||
|
||||
env.Program(src, LIBS=libs)
|
||||
env.Program('bootlog.cc', LIBS=libs)
|
||||
|
||||
if GetOption('test'):
|
||||
env.Program('tests/test_logger', ['tests/test_runner.cc', 'tests/test_logger.cc'], LIBS=[libs])
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
#include <cutils/properties.h>
|
||||
#endif
|
||||
|
||||
#include "cereal/messaging/messaging.h"
|
||||
#include "selfdrive/common/params.h"
|
||||
#include "selfdrive/common/swaglog.h"
|
||||
#include "selfdrive/common/version.h"
|
||||
|
@ -132,14 +131,14 @@ void log_init_data(LoggerState *s) {
|
|||
}
|
||||
|
||||
|
||||
static void log_sentinel(LoggerState *s, cereal::Sentinel::SentinelType type, int signal=0) {
|
||||
static void lh_log_sentinel(LoggerHandle *h, SentinelType type) {
|
||||
MessageBuilder msg;
|
||||
auto sen = msg.initEvent().initSentinel();
|
||||
sen.setType(type);
|
||||
sen.setSignal(signal);
|
||||
sen.setSignal(h->exit_signal);
|
||||
auto bytes = msg.toBytes();
|
||||
|
||||
logger_log(s, bytes.begin(), bytes.size(), true);
|
||||
lh_log(h, bytes.begin(), bytes.size(), true);
|
||||
}
|
||||
|
||||
// ***** logging functions *****
|
||||
|
@ -172,6 +171,8 @@ static LoggerHandle* logger_open(LoggerState *s, const char* root_path) {
|
|||
snprintf(h->log_path, sizeof(h->log_path), "%s/%s.bz2", h->segment_path, s->log_name);
|
||||
snprintf(h->qlog_path, sizeof(h->qlog_path), "%s/qlog.bz2", h->segment_path);
|
||||
snprintf(h->lock_path, sizeof(h->lock_path), "%s.lock", h->log_path);
|
||||
h->end_sentinel_type = SentinelType::END_OF_SEGMENT;
|
||||
h->exit_signal = 0;
|
||||
|
||||
err = logger_mkpath(h->log_path);
|
||||
if (err) return NULL;
|
||||
|
@ -194,7 +195,6 @@ int logger_next(LoggerState *s, const char* root_path,
|
|||
char* out_segment_path, size_t out_segment_path_len,
|
||||
int* out_part) {
|
||||
bool is_start_of_route = !s->cur_handle;
|
||||
if (!is_start_of_route) log_sentinel(s, cereal::Sentinel::SentinelType::END_OF_SEGMENT);
|
||||
|
||||
pthread_mutex_lock(&s->lock);
|
||||
s->part++;
|
||||
|
@ -221,7 +221,7 @@ int logger_next(LoggerState *s, const char* root_path,
|
|||
|
||||
// write beggining of log metadata
|
||||
log_init_data(s);
|
||||
log_sentinel(s, is_start_of_route ? cereal::Sentinel::SentinelType::START_OF_ROUTE : cereal::Sentinel::SentinelType::START_OF_SEGMENT);
|
||||
lh_log_sentinel(s->cur_handle, is_start_of_route ? SentinelType::START_OF_ROUTE : SentinelType::START_OF_SEGMENT);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -246,11 +246,10 @@ void logger_log(LoggerState *s, uint8_t* data, size_t data_size, bool in_qlog) {
|
|||
}
|
||||
|
||||
void logger_close(LoggerState *s, ExitHandler *exit_handler) {
|
||||
int signal = exit_handler == nullptr ? 0 : exit_handler->signal.load();
|
||||
log_sentinel(s, cereal::Sentinel::SentinelType::END_OF_ROUTE, signal);
|
||||
|
||||
pthread_mutex_lock(&s->lock);
|
||||
if (s->cur_handle) {
|
||||
s->cur_handle->exit_signal = exit_handler && exit_handler->signal.load();
|
||||
s->cur_handle->end_sentinel_type = SentinelType::END_OF_ROUTE;
|
||||
lh_close(s->cur_handle);
|
||||
}
|
||||
pthread_mutex_unlock(&s->lock);
|
||||
|
@ -269,6 +268,12 @@ void lh_log(LoggerHandle* h, uint8_t* data, size_t data_size, bool in_qlog) {
|
|||
void lh_close(LoggerHandle* h) {
|
||||
pthread_mutex_lock(&h->lock);
|
||||
assert(h->refcnt > 0);
|
||||
if (h->refcnt == 1) {
|
||||
// a very ugly hack. only here can guarantee sentinel is the last msg
|
||||
pthread_mutex_unlock(&h->lock);
|
||||
lh_log_sentinel(h, h->end_sentinel_type);
|
||||
pthread_mutex_lock(&h->lock);
|
||||
}
|
||||
h->refcnt--;
|
||||
if (h->refcnt == 0) {
|
||||
h->log.reset(nullptr);
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
#include <capnp/serialize.h>
|
||||
#include <kj/array.h>
|
||||
|
||||
#include "cereal/messaging/messaging.h"
|
||||
#include "selfdrive/common/util.h"
|
||||
#include "selfdrive/common/swaglog.h"
|
||||
#include "selfdrive/hardware/hw.h"
|
||||
|
@ -56,8 +57,12 @@ class BZFile {
|
|||
BZFILE* bz_file = nullptr;
|
||||
};
|
||||
|
||||
typedef cereal::Sentinel::SentinelType SentinelType;
|
||||
|
||||
typedef struct LoggerHandle {
|
||||
pthread_mutex_t lock;
|
||||
SentinelType end_sentinel_type;
|
||||
int exit_signal;
|
||||
int refcnt;
|
||||
char segment_path[4096];
|
||||
char log_path[4096];
|
||||
|
|
|
@ -0,0 +1,172 @@
|
|||
#include <sys/stat.h>
|
||||
|
||||
#include <condition_variable>
|
||||
#include <thread>
|
||||
|
||||
#include <climits>
|
||||
|
||||
#include "catch2/catch.hpp"
|
||||
#include "cereal/messaging/messaging.h"
|
||||
#include "selfdrive/common/util.h"
|
||||
#include "selfdrive/loggerd/logger.h"
|
||||
|
||||
typedef cereal::Sentinel::SentinelType SentinelType;
|
||||
|
||||
bool decompressBZ2(std::vector<uint8_t> &dest, const char srcData[], size_t srcSize, size_t outputSizeIncrement = 0x100000U) {
|
||||
bz_stream strm = {};
|
||||
int ret = BZ2_bzDecompressInit(&strm, 0, 0);
|
||||
assert(ret == BZ_OK);
|
||||
dest.resize(1024 * 1024);
|
||||
strm.next_in = const_cast<char *>(srcData);
|
||||
strm.avail_in = srcSize;
|
||||
do {
|
||||
strm.next_out = (char *)&dest[strm.total_out_lo32];
|
||||
strm.avail_out = dest.size() - strm.total_out_lo32;
|
||||
ret = BZ2_bzDecompress(&strm);
|
||||
if (ret == BZ_OK && strm.avail_in > 0 && strm.avail_out == 0) {
|
||||
dest.resize(dest.size() + outputSizeIncrement);
|
||||
}
|
||||
} while (ret == BZ_OK && strm.avail_in > 0);
|
||||
|
||||
BZ2_bzDecompressEnd(&strm);
|
||||
dest.resize(strm.total_out_lo32);
|
||||
return ret == BZ_STREAM_END;
|
||||
}
|
||||
|
||||
void verify_segment(const std::string &route_path, int segment, int max_segment, int required_event_cnt) {
|
||||
const std::string segment_path = route_path + "--" + std::to_string(segment);
|
||||
SentinelType begin_sentinel = segment == 0 ? SentinelType::START_OF_ROUTE : SentinelType::START_OF_SEGMENT;
|
||||
SentinelType end_sentinel = segment == max_segment - 1 ? SentinelType::END_OF_ROUTE : SentinelType::END_OF_SEGMENT;
|
||||
|
||||
REQUIRE(!util::file_exists(segment_path + "/rlog.bz2.lock"));
|
||||
for (const char *fn : {"/rlog.bz2", "/qlog.bz2"}) {
|
||||
const std::string log_file = segment_path + fn;
|
||||
INFO(log_file);
|
||||
std::string log_bz2 = util::read_file(log_file);
|
||||
REQUIRE(log_bz2.size() > 0);
|
||||
|
||||
std::vector<uint8_t> log;
|
||||
bool ret = decompressBZ2(log, log_bz2.data(), log_bz2.size());
|
||||
REQUIRE(ret);
|
||||
|
||||
int event_cnt = 0, i = 0;
|
||||
kj::ArrayPtr<const capnp::word> words((capnp::word *)log.data(), log.size() / sizeof(capnp::word));
|
||||
while (words.size() > 0) {
|
||||
try {
|
||||
capnp::FlatArrayMessageReader reader(words);
|
||||
auto event = reader.getRoot<cereal::Event>();
|
||||
words = kj::arrayPtr(reader.getEnd(), words.end());
|
||||
if (i == 0) {
|
||||
REQUIRE(event.which() == cereal::Event::INIT_DATA);
|
||||
} else if (i == 1) {
|
||||
REQUIRE(event.which() == cereal::Event::SENTINEL);
|
||||
REQUIRE(event.getSentinel().getType() == begin_sentinel);
|
||||
REQUIRE(event.getSentinel().getSignal() == 0);
|
||||
} else if (words.size() > 0) {
|
||||
REQUIRE(event.which() == cereal::Event::CLOCKS);
|
||||
++event_cnt;
|
||||
} else {
|
||||
// the last event must be SENTINEL
|
||||
REQUIRE(event.which() == cereal::Event::SENTINEL);
|
||||
REQUIRE(event.getSentinel().getType() == end_sentinel);
|
||||
REQUIRE(event.getSentinel().getSignal() == (end_sentinel == SentinelType::END_OF_ROUTE ? 1 : 0));
|
||||
}
|
||||
++i;
|
||||
} catch (const kj::Exception &ex) {
|
||||
INFO("failed parse " << i << " excpetion :" << ex.getDescription());
|
||||
REQUIRE(0);
|
||||
break;
|
||||
}
|
||||
}
|
||||
REQUIRE(event_cnt == required_event_cnt);
|
||||
}
|
||||
}
|
||||
|
||||
void write_msg(LoggerHandle *logger) {
|
||||
MessageBuilder msg;
|
||||
msg.initEvent().initClocks();
|
||||
auto bytes = msg.toBytes();
|
||||
lh_log(logger, bytes.begin(), bytes.size(), true);
|
||||
}
|
||||
|
||||
TEST_CASE("logger") {
|
||||
const std::string log_root = "/tmp/test_logger";
|
||||
system(("rm " + log_root + " -rf").c_str());
|
||||
|
||||
ExitHandler do_exit;
|
||||
|
||||
LoggerState logger = {};
|
||||
logger_init(&logger, "rlog", true);
|
||||
char segment_path[PATH_MAX] = {};
|
||||
int segment = -1;
|
||||
|
||||
SECTION("single thread logging & rotation(100 segments, one thread)") {
|
||||
const int segment_cnt = 100;
|
||||
for (int i = 0; i < segment_cnt; ++i) {
|
||||
REQUIRE(logger_next(&logger, log_root.c_str(), segment_path, sizeof(segment_path), &segment) == 0);
|
||||
REQUIRE(util::file_exists(std::string(segment_path) + "/rlog.bz2.lock"));
|
||||
REQUIRE(segment == i);
|
||||
write_msg(logger.cur_handle);
|
||||
}
|
||||
do_exit = true;
|
||||
do_exit.signal = 1;
|
||||
logger_close(&logger, &do_exit);
|
||||
for (int i = 0; i < segment_cnt; ++i) {
|
||||
verify_segment(log_root + "/" + logger.route_name, i, segment_cnt, 1);
|
||||
}
|
||||
}
|
||||
SECTION("multiple threads logging & rotation(100 segments, 10 threads") {
|
||||
const int segment_cnt = 100, thread_cnt = 10;
|
||||
std::atomic<int> event_cnt[segment_cnt] = {};
|
||||
std::atomic<int> main_segment = -1;
|
||||
|
||||
auto logging_thread = [&]() -> void {
|
||||
LoggerHandle *lh = logger_get_handle(&logger);
|
||||
REQUIRE(lh != nullptr);
|
||||
int segment = main_segment;
|
||||
int delayed_cnt = 0;
|
||||
while (!do_exit) {
|
||||
// write 2 more messages in the current segment and then rotate to the new segment.
|
||||
if (main_segment > segment && ++delayed_cnt == 2) {
|
||||
lh_close(lh);
|
||||
lh = logger_get_handle(&logger);
|
||||
segment = main_segment;
|
||||
delayed_cnt = 0;
|
||||
}
|
||||
write_msg(lh);
|
||||
event_cnt[segment] += 1;
|
||||
usleep(1);
|
||||
}
|
||||
lh_close(lh);
|
||||
};
|
||||
|
||||
// start logging
|
||||
std::vector<std::thread> threads;
|
||||
for (int i = 0; i < segment_cnt; ++i) {
|
||||
REQUIRE(logger_next(&logger, log_root.c_str(), segment_path, sizeof(segment_path), &segment) == 0);
|
||||
REQUIRE(segment == i);
|
||||
main_segment = segment;
|
||||
if (i == 0) {
|
||||
for (int i = 0; i < thread_cnt; ++i) {
|
||||
threads.push_back(std::thread(logging_thread));
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
write_msg(logger.cur_handle);
|
||||
usleep(1);
|
||||
}
|
||||
event_cnt[segment] += 100;
|
||||
}
|
||||
|
||||
// end logging
|
||||
for (auto &t : threads) t.join();
|
||||
do_exit = true;
|
||||
do_exit.signal = 1;
|
||||
logger_close(&logger, &do_exit);
|
||||
REQUIRE(logger.cur_handle->refcnt == 0);
|
||||
|
||||
for (int i = 0; i < segment_cnt; ++i) {
|
||||
verify_segment(log_root + "/" + logger.route_name, i, segment_cnt, event_cnt[i]);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,2 @@
|
|||
#define CATCH_CONFIG_MAIN
|
||||
#include "catch2/catch.hpp"
|
Loading…
Reference in New Issue