loggerd: add test case for trigger_rotate (#23038)
* test rotate * remove global LoggerdState old-commit-hash: 1d323e0fd6e6050180bdf1b0a9793a4f52c47e9a
This commit is contained in:
@@ -2,30 +2,42 @@
|
||||
|
||||
ExitHandler do_exit;
|
||||
|
||||
LoggerdState s;
|
||||
|
||||
// Handle initial encoder syncing by waiting for all encoders to reach the same frame id
|
||||
bool sync_encoders(LoggerdState *state, CameraType cam_type, uint32_t frame_id) {
|
||||
if (state->camera_synced[cam_type]) return true;
|
||||
bool sync_encoders(LoggerdState *s, CameraType cam_type, uint32_t frame_id) {
|
||||
if (s->camera_synced[cam_type]) return true;
|
||||
|
||||
if (state->max_waiting > 1 && state->encoders_ready != state->max_waiting) {
|
||||
if (s->max_waiting > 1 && s->encoders_ready != s->max_waiting) {
|
||||
// add a small margin to the start frame id in case one of the encoders already dropped the next frame
|
||||
update_max_atomic(state->start_frame_id, frame_id + 2);
|
||||
if (std::exchange(state->camera_ready[cam_type], true) == false) {
|
||||
++state->encoders_ready;
|
||||
update_max_atomic(s->start_frame_id, frame_id + 2);
|
||||
if (std::exchange(s->camera_ready[cam_type], true) == false) {
|
||||
++s->encoders_ready;
|
||||
LOGE("camera %d encoder ready", cam_type);
|
||||
}
|
||||
return false;
|
||||
} else {
|
||||
if (state->max_waiting == 1) update_max_atomic(state->start_frame_id, frame_id);
|
||||
bool synced = frame_id >= state->start_frame_id;
|
||||
state->camera_synced[cam_type] = synced;
|
||||
if (!synced) LOGE("camera %d waiting for frame %d, cur %d", cam_type, (int)state->start_frame_id, frame_id);
|
||||
if (s->max_waiting == 1) update_max_atomic(s->start_frame_id, frame_id);
|
||||
bool synced = frame_id >= s->start_frame_id;
|
||||
s->camera_synced[cam_type] = synced;
|
||||
if (!synced) LOGE("camera %d waiting for frame %d, cur %d", cam_type, (int)s->start_frame_id, frame_id);
|
||||
return synced;
|
||||
}
|
||||
}
|
||||
|
||||
void encoder_thread(const LogCameraInfo &cam_info) {
|
||||
bool trigger_rotate_if_needed(LoggerdState *s, int cur_seg, uint32_t frame_id) {
|
||||
const int frames_per_seg = SEGMENT_LENGTH * MAIN_FPS;
|
||||
if (cur_seg >= 0 && frame_id >= ((cur_seg + 1) * frames_per_seg) + s->start_frame_id) {
|
||||
// trigger rotate and wait until the main logger has rotated to the new segment
|
||||
++s->ready_to_rotate;
|
||||
std::unique_lock lk(s->rotate_lock);
|
||||
s->rotate_cv.wait(lk, [&] {
|
||||
return s->rotate_segment > cur_seg || do_exit;
|
||||
});
|
||||
return !do_exit;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void encoder_thread(LoggerdState *s, const LogCameraInfo &cam_info) {
|
||||
set_thread_name(cam_info.filename);
|
||||
|
||||
int cur_seg = -1;
|
||||
@@ -62,37 +74,29 @@ void encoder_thread(const LogCameraInfo &cam_info) {
|
||||
if (buf == nullptr) continue;
|
||||
|
||||
if (cam_info.trigger_rotate) {
|
||||
s.last_camera_seen_tms = millis_since_boot();
|
||||
if (!sync_encoders(&s, cam_info.type, extra.frame_id)) {
|
||||
s->last_camera_seen_tms = millis_since_boot();
|
||||
if (!sync_encoders(s, cam_info.type, extra.frame_id)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// check if we're ready to rotate
|
||||
const int frames_per_seg = SEGMENT_LENGTH * MAIN_FPS;
|
||||
if (cur_seg >= 0 && extra.frame_id >= ((cur_seg+1) * frames_per_seg) + s.start_frame_id) {
|
||||
// trigger rotate and wait until the main logger has rotated to the new segment
|
||||
++s.ready_to_rotate;
|
||||
std::unique_lock lk(s.rotate_lock);
|
||||
s.rotate_cv.wait(lk, [&] {
|
||||
return s.rotate_segment > cur_seg || do_exit;
|
||||
});
|
||||
if (do_exit) break;
|
||||
}
|
||||
trigger_rotate_if_needed(s, cur_seg, extra.frame_id);
|
||||
if (do_exit) break;
|
||||
}
|
||||
|
||||
// rotate the encoder if the logger is on a newer segment
|
||||
if (s.rotate_segment > cur_seg) {
|
||||
cur_seg = s.rotate_segment;
|
||||
if (s->rotate_segment > cur_seg) {
|
||||
cur_seg = s->rotate_segment;
|
||||
|
||||
LOGW("camera %d rotate encoder to %s", cam_info.type, s.segment_path);
|
||||
LOGW("camera %d rotate encoder to %s", cam_info.type, s->segment_path);
|
||||
for (auto &e : encoders) {
|
||||
e->encoder_close();
|
||||
e->encoder_open(s.segment_path);
|
||||
e->encoder_open(s->segment_path);
|
||||
}
|
||||
if (lh) {
|
||||
lh_close(lh);
|
||||
}
|
||||
lh = logger_get_handle(&s.logger);
|
||||
lh = logger_get_handle(&s->logger);
|
||||
}
|
||||
|
||||
// encode a frame
|
||||
@@ -157,31 +161,31 @@ void clear_locks() {
|
||||
ftw(LOG_ROOT.c_str(), clear_locks_fn, 16);
|
||||
}
|
||||
|
||||
void logger_rotate() {
|
||||
void logger_rotate(LoggerdState *s) {
|
||||
{
|
||||
std::unique_lock lk(s.rotate_lock);
|
||||
std::unique_lock lk(s->rotate_lock);
|
||||
int segment = -1;
|
||||
int err = logger_next(&s.logger, LOG_ROOT.c_str(), s.segment_path, sizeof(s.segment_path), &segment);
|
||||
int err = logger_next(&s->logger, LOG_ROOT.c_str(), s->segment_path, sizeof(s->segment_path), &segment);
|
||||
assert(err == 0);
|
||||
s.rotate_segment = segment;
|
||||
s.ready_to_rotate = 0;
|
||||
s.last_rotate_tms = millis_since_boot();
|
||||
s->rotate_segment = segment;
|
||||
s->ready_to_rotate = 0;
|
||||
s->last_rotate_tms = millis_since_boot();
|
||||
}
|
||||
s.rotate_cv.notify_all();
|
||||
LOGW((s.logger.part == 0) ? "logging to %s" : "rotated to %s", s.segment_path);
|
||||
s->rotate_cv.notify_all();
|
||||
LOGW((s->logger.part == 0) ? "logging to %s" : "rotated to %s", s->segment_path);
|
||||
}
|
||||
|
||||
void rotate_if_needed() {
|
||||
if (s.ready_to_rotate == s.max_waiting) {
|
||||
logger_rotate();
|
||||
void rotate_if_needed(LoggerdState *s) {
|
||||
if (s->ready_to_rotate == s->max_waiting) {
|
||||
logger_rotate(s);
|
||||
}
|
||||
|
||||
double tms = millis_since_boot();
|
||||
if ((tms - s.last_rotate_tms) > SEGMENT_LENGTH * 1000 &&
|
||||
(tms - s.last_camera_seen_tms) > NO_CAMERA_PATIENCE &&
|
||||
if ((tms - s->last_rotate_tms) > SEGMENT_LENGTH * 1000 &&
|
||||
(tms - s->last_camera_seen_tms) > NO_CAMERA_PATIENCE &&
|
||||
!LOGGERD_TEST) {
|
||||
LOGW("no camera packet seen. auto rotating");
|
||||
logger_rotate();
|
||||
logger_rotate(s);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -194,6 +198,7 @@ void loggerd_thread() {
|
||||
} QlogState;
|
||||
std::unordered_map<SubSocket*, QlogState> qlog_states;
|
||||
|
||||
LoggerdState s;
|
||||
s.ctx = Context::create();
|
||||
Poller * poller = Poller::create();
|
||||
|
||||
@@ -209,7 +214,7 @@ void loggerd_thread() {
|
||||
|
||||
// init logger
|
||||
logger_init(&s.logger, "rlog", true);
|
||||
logger_rotate();
|
||||
logger_rotate(&s);
|
||||
Params().put("CurrentRoute", s.logger.route_name);
|
||||
|
||||
// init encoders
|
||||
@@ -217,7 +222,7 @@ void loggerd_thread() {
|
||||
std::vector<std::thread> encoder_threads;
|
||||
for (const auto &cam : cameras_logged) {
|
||||
if (cam.enable) {
|
||||
encoder_threads.push_back(std::thread(encoder_thread, cam));
|
||||
encoder_threads.push_back(std::thread(encoder_thread, &s, cam));
|
||||
if (cam.trigger_rotate) s.max_waiting++;
|
||||
}
|
||||
}
|
||||
@@ -236,7 +241,7 @@ void loggerd_thread() {
|
||||
bytes_count += msg->getSize();
|
||||
delete msg;
|
||||
|
||||
rotate_if_needed();
|
||||
rotate_if_needed(&s);
|
||||
|
||||
if ((++msg_count % 1000) == 0) {
|
||||
double seconds = (millis_since_boot() - start_ts) / 1000.0;
|
||||
|
||||
@@ -115,5 +115,7 @@ struct LoggerdState {
|
||||
bool camera_synced[WideRoadCam + 1] = {};
|
||||
};
|
||||
|
||||
bool sync_encoders(LoggerdState *state, CameraType cam_type, uint32_t frame_id);
|
||||
bool sync_encoders(LoggerdState *s, CameraType cam_type, uint32_t frame_id);
|
||||
bool trigger_rotate_if_needed(LoggerdState *s, int cur_seg, uint32_t frame_id);
|
||||
void rotate_if_needed(LoggerdState *s);
|
||||
void loggerd_thread();
|
||||
|
||||
@@ -48,3 +48,46 @@ TEST_CASE("sync_encoders") {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const int MAX_SEGMENT_CNT = 100;
|
||||
|
||||
std::pair<int, uint32_t> encoder_thread(LoggerdState *s) {
|
||||
int cur_seg = 0;
|
||||
uint32_t frame_id = s->start_frame_id;
|
||||
|
||||
while (cur_seg < MAX_SEGMENT_CNT) {
|
||||
++frame_id;
|
||||
if (trigger_rotate_if_needed(s, cur_seg, frame_id)) {
|
||||
cur_seg = s->rotate_segment;
|
||||
}
|
||||
util::sleep_for(0);
|
||||
}
|
||||
|
||||
return {cur_seg, frame_id};
|
||||
}
|
||||
|
||||
TEST_CASE("trigger_rotate") {
|
||||
const int encoders = GENERATE(1, 2, 3);
|
||||
const int start_frame_id = random_int(0, 20);
|
||||
|
||||
LoggerdState s{
|
||||
.max_waiting = encoders,
|
||||
.start_frame_id = start_frame_id,
|
||||
};
|
||||
|
||||
std::vector<std::future<std::pair<int, uint32_t>>> futures;
|
||||
for (int i = 0; i < encoders; ++i) {
|
||||
futures.emplace_back(std::async(std::launch::async, encoder_thread, &s));
|
||||
}
|
||||
|
||||
while (s.rotate_segment < MAX_SEGMENT_CNT) {
|
||||
rotate_if_needed(&s);
|
||||
util::sleep_for(10);
|
||||
}
|
||||
|
||||
for (auto &f : futures) {
|
||||
auto [encoder_seg, frame_id] = f.get();
|
||||
REQUIRE(encoder_seg == MAX_SEGMENT_CNT);
|
||||
REQUIRE(frame_id == start_frame_id + encoder_seg * (SEGMENT_LENGTH * MAIN_FPS));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user