mirror of
https://github.com/sunnypilot/sunnypilot.git
synced 2026-02-18 21:14:01 +08:00
process_replay: modeld and dmonitoringmodeld support (#28463)
* vipc support. modeld and dmonitoringmodeld configs * Update cereal * Add vision meta * Use vision meta in process replay * Exclude modeld and dmonitoringmodeld from test_processes * Refactor other pieces of code to use vision meta * remove unnecessary imports * Add MAX_FRAMES_PER_SEGMENT * cameraState migration * Move camera state migration to migration module * model_replay refactor * Fix lint issues * Reduce number of vipc buffers to 2 * Update spinner message * Update model replay ref * Support the new log order * Replace CameraFrameSizes with dict. Add encode_index field * Migrate camera states based on encode indices * Refactor process replay * Refactor model replay * Remove unnecessary print * Update model replay ref * Make camera state migration optional * Remove unused import * Add ignore_alive_pubs field * Add assertion for carParams inside fingerprinting callback * Remove WideCameraOnly param setup * Refactor ModeldCameraSyncRcvCallback * Update model replay ref commit * Fix camera state migration * Only migrate camera states when process has vision pubs * Refactor model replay again * Fix linter issues * One more linter fix * Update model replay ref commit
This commit is contained in:
@@ -9,10 +9,7 @@ from cereal.visionipc import VisionIpcServer, VisionStreamType
|
||||
from common.transformations.camera import tici_f_frame_size
|
||||
from common.realtime import DT_MDL
|
||||
from selfdrive.manager.process_config import managed_processes
|
||||
|
||||
|
||||
VIPC_STREAM = {"roadCameraState": VisionStreamType.VISION_STREAM_ROAD, "driverCameraState": VisionStreamType.VISION_STREAM_DRIVER,
|
||||
"wideRoadCameraState": VisionStreamType.VISION_STREAM_WIDE_ROAD}
|
||||
from selfdrive.test.process_replay.vision_meta import meta_from_camera_state
|
||||
|
||||
IMG = np.zeros(int(tici_f_frame_size[0]*tici_f_frame_size[1]*(3/2)), dtype=np.uint8)
|
||||
IMG_BYTES = IMG.flatten().tobytes()
|
||||
@@ -48,9 +45,10 @@ class TestModeld(unittest.TestCase):
|
||||
cs.frameId = frame_id
|
||||
cs.timestampSof = int((frame_id * DT_MDL) * 1e9)
|
||||
cs.timestampEof = int(cs.timestampSof + (DT_MDL * 1e9))
|
||||
cam_meta = meta_from_camera_state(cam)
|
||||
|
||||
self.pm.send(msg.which(), msg)
|
||||
self.vipc_server.send(VIPC_STREAM[msg.which()], IMG_BYTES, cs.frameId,
|
||||
self.vipc_server.send(cam_meta.stream, IMG_BYTES, cs.frameId,
|
||||
cs.timestampSof, cs.timestampEof)
|
||||
return cs
|
||||
|
||||
|
||||
@@ -1,13 +1,57 @@
|
||||
from collections import defaultdict
|
||||
|
||||
from cereal import messaging
|
||||
from selfdrive.test.process_replay.vision_meta import meta_from_encode_index
|
||||
|
||||
|
||||
def migrate_all(lr, old_logtime=False):
|
||||
def migrate_all(lr, old_logtime=False, camera_states=False):
|
||||
msgs = migrate_sensorEvents(lr, old_logtime)
|
||||
msgs = migrate_carParams(msgs, old_logtime)
|
||||
if camera_states:
|
||||
msgs = migrate_cameraStates(msgs)
|
||||
|
||||
return msgs
|
||||
|
||||
|
||||
def migrate_cameraStates(lr):
|
||||
all_msgs = []
|
||||
frame_to_encode_id = defaultdict(dict)
|
||||
|
||||
for msg in lr:
|
||||
if msg.which() not in ["roadEncodeIdx", "wideRoadEncodeIdx", "driverEncodeIdx"]:
|
||||
continue
|
||||
|
||||
encode_index = getattr(msg, msg.which())
|
||||
meta = meta_from_encode_index(msg.which())
|
||||
|
||||
assert encode_index.segmentId < 1200, f"Encoder index segmentId greater that 1200: {msg.which()} {encode_index.segmentId}"
|
||||
frame_to_encode_id[meta.camera_state][encode_index.frameId] = encode_index.segmentId
|
||||
|
||||
for msg in lr:
|
||||
if msg.which() not in ["roadCameraState", "wideRoadCameraState", "driverCameraState"]:
|
||||
all_msgs.append(msg)
|
||||
continue
|
||||
|
||||
camera_state = getattr(msg, msg.which())
|
||||
encode_id = frame_to_encode_id[msg.which()].get(camera_state.frameId)
|
||||
if encode_id is None:
|
||||
print(f"Missing encoded frame for camera feed {msg.which()} with frameId: {camera_state.frameId}")
|
||||
continue
|
||||
|
||||
new_msg = messaging.new_message(msg.which())
|
||||
new_camera_state = getattr(new_msg, new_msg.which())
|
||||
new_camera_state.frameId = encode_id
|
||||
new_camera_state.encodeId = encode_id
|
||||
new_camera_state.timestampSof = camera_state.timestampSof
|
||||
new_camera_state.timestampEof = camera_state.timestampEof
|
||||
new_msg.logMonoTime = msg.logMonoTime
|
||||
new_msg.valid = msg.valid
|
||||
|
||||
all_msgs.append(new_msg.as_reader())
|
||||
|
||||
return all_msgs
|
||||
|
||||
|
||||
def migrate_carParams(lr, old_logtime=False):
|
||||
all_msgs = []
|
||||
for msg in lr:
|
||||
|
||||
@@ -4,18 +4,15 @@ import sys
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from typing import Any
|
||||
from itertools import zip_longest
|
||||
|
||||
import cereal.messaging as messaging
|
||||
from cereal.visionipc import VisionIpcServer, VisionStreamType
|
||||
from common.spinner import Spinner
|
||||
from common.timeout import Timeout
|
||||
from common.transformations.camera import tici_f_frame_size, tici_d_frame_size
|
||||
from system.hardware import PC
|
||||
from selfdrive.manager.process_config import managed_processes
|
||||
from selfdrive.test.openpilotci import BASE_URL, get_url
|
||||
from selfdrive.test.process_replay.compare_logs import compare_logs
|
||||
from selfdrive.test.process_replay.test_processes import format_diff
|
||||
from selfdrive.test.process_replay.process_replay import get_process_config, replay_process
|
||||
from system.version import get_commit
|
||||
from tools.lib.framereader import FrameReader
|
||||
from tools.lib.logreader import LogReader
|
||||
@@ -27,20 +24,30 @@ MAX_FRAMES = 100 if PC else 600
|
||||
NAV_FRAMES = 50
|
||||
|
||||
NO_NAV = "NO_NAV" in os.environ
|
||||
SEND_EXTRA_INPUTS = bool(os.getenv("SEND_EXTRA_INPUTS", "0"))
|
||||
SEND_EXTRA_INPUTS = bool(int(os.getenv("SEND_EXTRA_INPUTS", "0")))
|
||||
|
||||
VIPC_STREAM = {"roadCameraState": VisionStreamType.VISION_STREAM_ROAD, "driverCameraState": VisionStreamType.VISION_STREAM_DRIVER,
|
||||
"wideRoadCameraState": VisionStreamType.VISION_STREAM_WIDE_ROAD}
|
||||
|
||||
def get_log_fn(ref_commit, test_route):
|
||||
return f"{test_route}_model_tici_{ref_commit}.bz2"
|
||||
|
||||
|
||||
def replace_calib(msg, calib):
|
||||
msg = msg.as_builder()
|
||||
if calib is not None:
|
||||
msg.liveCalibration.rpyCalib = calib.tolist()
|
||||
return msg
|
||||
def trim_logs_to_max_frames(logs, max_frames, frs_types, include_all_types):
|
||||
all_msgs = []
|
||||
cam_state_counts = defaultdict(int)
|
||||
# keep adding messages until cam states are equal to MAX_FRAMES
|
||||
for msg in sorted(logs, key=lambda m: m.logMonoTime):
|
||||
all_msgs.append(msg)
|
||||
if msg.which() in frs_types:
|
||||
cam_state_counts[msg.which()] += 1
|
||||
|
||||
if all(cam_state_counts[state] == max_frames for state in frs_types):
|
||||
break
|
||||
|
||||
if len(include_all_types) != 0:
|
||||
other_msgs = [m for m in logs if m.which() in include_all_types]
|
||||
all_msgs.extend(other_msgs)
|
||||
|
||||
return all_msgs
|
||||
|
||||
|
||||
def nav_model_replay(lr):
|
||||
@@ -102,102 +109,38 @@ def model_replay(lr, frs):
|
||||
else:
|
||||
spinner = None
|
||||
|
||||
vipc_server = VisionIpcServer("camerad")
|
||||
vipc_server.create_buffers(VisionStreamType.VISION_STREAM_ROAD, 40, False, *(tici_f_frame_size))
|
||||
vipc_server.create_buffers(VisionStreamType.VISION_STREAM_DRIVER, 40, False, *(tici_d_frame_size))
|
||||
vipc_server.create_buffers(VisionStreamType.VISION_STREAM_WIDE_ROAD, 40, False, *(tici_f_frame_size))
|
||||
vipc_server.start_listener()
|
||||
log_msgs = []
|
||||
|
||||
sm = messaging.SubMaster(['modelV2', 'driverStateV2'])
|
||||
pm = messaging.PubMaster(['roadCameraState', 'wideRoadCameraState', 'driverCameraState', 'liveCalibration', 'lateralPlan'])
|
||||
# modeld is using frame pairs
|
||||
modeld_logs = trim_logs_to_max_frames(lr, MAX_FRAMES, {"roadCameraState", "wideRoadCameraState"}, {"roadEncodeIdx", "wideRoadEncodeIdx"})
|
||||
dmodeld_logs = trim_logs_to_max_frames(lr, MAX_FRAMES, {"driverCameraState"}, {"driverEncodeIdx"})
|
||||
if not SEND_EXTRA_INPUTS:
|
||||
modeld_logs = [msg for msg in modeld_logs if msg.which() not in ["liveCalibration", "lateralPlan"]]
|
||||
dmodeld_logs = [msg for msg in dmodeld_logs if msg.which() not in ["liveCalibration", "lateralPlan"]]
|
||||
# initial calibration
|
||||
cal_msg = next(msg for msg in lr if msg.which() == "liveCalibration").as_builder()
|
||||
cal_msg.logMonoTime = lr[0].logMonoTime
|
||||
modeld_logs.insert(0, cal_msg.as_reader())
|
||||
dmodeld_logs.insert(0, cal_msg.as_reader())
|
||||
|
||||
modeld = get_process_config("modeld")
|
||||
dmonitoringmodeld = get_process_config("dmonitoringmodeld")
|
||||
|
||||
try:
|
||||
managed_processes['modeld'].start()
|
||||
managed_processes['dmonitoringmodeld'].start()
|
||||
time.sleep(5)
|
||||
sm.update(1000)
|
||||
|
||||
log_msgs = []
|
||||
last_desire = None
|
||||
recv_cnt = defaultdict(int)
|
||||
frame_idxs = defaultdict(int)
|
||||
|
||||
# init modeld with valid calibration
|
||||
cal_msgs = [msg for msg in lr if msg.which() == "liveCalibration"]
|
||||
for _ in range(5):
|
||||
pm.send(cal_msgs[0].which(), cal_msgs[0].as_builder())
|
||||
time.sleep(0.1)
|
||||
|
||||
msgs = defaultdict(list)
|
||||
for msg in lr:
|
||||
msgs[msg.which()].append(msg)
|
||||
|
||||
for cam_msgs in zip_longest(msgs['roadCameraState'], msgs['wideRoadCameraState'], msgs['driverCameraState']):
|
||||
# need a pair of road/wide msgs
|
||||
if None in (cam_msgs[0], cam_msgs[1]):
|
||||
break
|
||||
|
||||
for msg in cam_msgs:
|
||||
if msg is None:
|
||||
continue
|
||||
|
||||
if SEND_EXTRA_INPUTS:
|
||||
if msg.which() == "liveCalibration":
|
||||
last_calib = list(msg.liveCalibration.rpyCalib)
|
||||
pm.send(msg.which(), replace_calib(msg, last_calib))
|
||||
elif msg.which() == "lateralPlan":
|
||||
last_desire = msg.lateralPlan.desire
|
||||
dat = messaging.new_message('lateralPlan')
|
||||
dat.lateralPlan.desire = last_desire
|
||||
pm.send('lateralPlan', dat)
|
||||
|
||||
if msg.which() in VIPC_STREAM:
|
||||
msg = msg.as_builder()
|
||||
camera_state = getattr(msg, msg.which())
|
||||
img = frs[msg.which()].get(frame_idxs[msg.which()], pix_fmt="nv12")[0]
|
||||
frame_idxs[msg.which()] += 1
|
||||
|
||||
# send camera state and frame
|
||||
camera_state.frameId = frame_idxs[msg.which()]
|
||||
pm.send(msg.which(), msg)
|
||||
vipc_server.send(VIPC_STREAM[msg.which()], img.flatten().tobytes(), camera_state.frameId,
|
||||
camera_state.timestampSof, camera_state.timestampEof)
|
||||
|
||||
recv = None
|
||||
if msg.which() in ('roadCameraState', 'wideRoadCameraState'):
|
||||
if min(frame_idxs['roadCameraState'], frame_idxs['wideRoadCameraState']) > recv_cnt['modelV2']:
|
||||
recv = "modelV2"
|
||||
elif msg.which() == 'driverCameraState':
|
||||
recv = "driverStateV2"
|
||||
|
||||
# wait for a response
|
||||
with Timeout(15, f"timed out waiting for {recv}"):
|
||||
if recv:
|
||||
recv_cnt[recv] += 1
|
||||
log_msgs.append(messaging.recv_one(sm.sock[recv]))
|
||||
|
||||
if spinner:
|
||||
spinner.update("replaying models: road %d/%d, driver %d/%d" % (frame_idxs['roadCameraState'],
|
||||
frs['roadCameraState'].frame_count, frame_idxs['driverCameraState'], frs['driverCameraState'].frame_count))
|
||||
|
||||
|
||||
if any(frame_idxs[c] >= frs[c].frame_count for c in frame_idxs.keys()) or frame_idxs['roadCameraState'] == MAX_FRAMES:
|
||||
break
|
||||
else:
|
||||
print(f'Received {frame_idxs["roadCameraState"]} frames')
|
||||
|
||||
if spinner:
|
||||
spinner.update("running model replay")
|
||||
modeld_msgs = replay_process(modeld, modeld_logs, frs)
|
||||
dmonitoringmodeld_msgs = replay_process(dmonitoringmodeld, dmodeld_logs, frs)
|
||||
log_msgs.extend([m for m in modeld_msgs if m.which() == "modelV2"])
|
||||
log_msgs.extend([m for m in dmonitoringmodeld_msgs if m.which() == "driverStateV2"])
|
||||
finally:
|
||||
if spinner:
|
||||
spinner.close()
|
||||
managed_processes['modeld'].stop()
|
||||
managed_processes['dmonitoringmodeld'].stop()
|
||||
|
||||
|
||||
return log_msgs
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
update = "--update" in sys.argv
|
||||
replay_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
ref_commit_fn = os.path.join(replay_dir, "model_replay_ref_commit")
|
||||
@@ -247,10 +190,18 @@ if __name__ == "__main__":
|
||||
ref_commit = f.read().strip()
|
||||
log_fn = get_log_fn(ref_commit, TEST_ROUTE)
|
||||
try:
|
||||
expected_msgs = 2*MAX_FRAMES
|
||||
all_logs = list(LogReader(BASE_URL + log_fn))
|
||||
cmp_log = []
|
||||
|
||||
# logs are ordered based on type: modelV2, driverStateV2, nav messages (navThumbnail, mapRenderState, navModel)
|
||||
model_start_index = next(i for i, m in enumerate(all_logs) if m.which() == "modelV2")
|
||||
cmp_log += all_logs[model_start_index:model_start_index + MAX_FRAMES]
|
||||
dmon_start_index = next(i for i, m in enumerate(all_logs) if m.which() == "driverStateV2")
|
||||
cmp_log += all_logs[dmon_start_index:dmon_start_index + MAX_FRAMES]
|
||||
if not NO_NAV:
|
||||
expected_msgs += NAV_FRAMES*3
|
||||
cmp_log = list(LogReader(BASE_URL + log_fn))[:expected_msgs]
|
||||
nav_start_index = next(i for i, m in enumerate(all_logs) if m.which() in ["navThumbnail", "mapRenderState", "navModel"])
|
||||
nav_logs = all_logs[nav_start_index:nav_start_index + NAV_FRAMES*3]
|
||||
cmp_log += nav_logs
|
||||
|
||||
ignore = [
|
||||
'logMonoTime',
|
||||
|
||||
@@ -1 +1 @@
|
||||
f3595e5f0d3e04c68612e1fd639607146e23c8ad
|
||||
965fa8cc8c131a8978c142813658b724a519ac9e
|
||||
|
||||
@@ -12,6 +12,7 @@ import capnp
|
||||
import cereal.messaging as messaging
|
||||
from cereal import car
|
||||
from cereal.services import service_list
|
||||
from cereal.visionipc import VisionIpcServer, get_endpoint_name as vipc_get_endpoint_name
|
||||
from common.params import Params
|
||||
from common.timeout import Timeout
|
||||
from common.realtime import DT_CTRL
|
||||
@@ -19,6 +20,7 @@ from panda.python import ALTERNATIVE_EXPERIENCE
|
||||
from selfdrive.car.car_helpers import get_car, interfaces
|
||||
from selfdrive.manager.process_config import managed_processes
|
||||
from selfdrive.test.process_replay.helpers import OpenpilotPrefix
|
||||
from selfdrive.test.process_replay.vision_meta import meta_from_camera_state, available_streams
|
||||
from selfdrive.test.process_replay.migration import migrate_all
|
||||
from tools.lib.logreader import LogReader
|
||||
|
||||
@@ -29,24 +31,24 @@ TIMEOUT = 15
|
||||
PROC_REPLAY_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
FAKEDATA = os.path.join(PROC_REPLAY_DIR, "fakedata/")
|
||||
|
||||
|
||||
class ReplayContext:
|
||||
def __init__(self, cfg):
|
||||
self.proc_name = cfg.proc_name
|
||||
self.pubs = cfg.pubs
|
||||
self.drained_pub = cfg.drained_pub
|
||||
assert(len(self.pubs) != 0 or self.drained_pub is not None)
|
||||
self.main_pub = cfg.main_pub
|
||||
self.main_pub_drained = cfg.main_pub_drained
|
||||
assert(len(self.pubs) != 0 or self.main_pub is not None)
|
||||
|
||||
def __enter__(self):
|
||||
messaging.toggle_fake_events(True)
|
||||
messaging.set_fake_prefix(self.proc_name)
|
||||
|
||||
if self.drained_pub is None:
|
||||
if self.main_pub is None:
|
||||
self.events = OrderedDict()
|
||||
for pub in self.pubs:
|
||||
self.events[pub] = messaging.fake_event_handle(pub, enable=True)
|
||||
else:
|
||||
self.events = {self.drained_pub: messaging.fake_event_handle(self.drained_pub, enable=True)}
|
||||
self.events = {self.main_pub: messaging.fake_event_handle(self.main_pub, enable=True)}
|
||||
|
||||
return self
|
||||
|
||||
@@ -83,7 +85,7 @@ class ReplayContext:
|
||||
|
||||
def wait_for_next_recv(self, trigger_empty_recv):
|
||||
index = messaging.wait_for_one_event(self.all_recv_called_events)
|
||||
if self.drained_pub is not None and trigger_empty_recv:
|
||||
if self.main_pub is not None and self.main_pub_drained and trigger_empty_recv:
|
||||
self.all_recv_called_events[index].clear()
|
||||
self.all_recv_ready_events[index].set()
|
||||
self.all_recv_called_events[index].wait()
|
||||
@@ -95,16 +97,19 @@ class ProcessConfig:
|
||||
pubs: List[str]
|
||||
subs: List[str]
|
||||
ignore: List[str]
|
||||
config_callback: Optional[Callable]
|
||||
init_callback: Optional[Callable]
|
||||
should_recv_callback: Optional[Callable]
|
||||
config_callback: Optional[Callable] = None
|
||||
init_callback: Optional[Callable] = None
|
||||
should_recv_callback: Optional[Callable] = None
|
||||
tolerance: Optional[float] = None
|
||||
environ: Dict[str, str] = field(default_factory=dict)
|
||||
subtest_name: str = ""
|
||||
field_tolerances: Dict[str, float] = field(default_factory=dict)
|
||||
timeout: int = 30
|
||||
simulation: bool = True
|
||||
drained_pub: Optional[str] = None
|
||||
main_pub: Optional[str] = None
|
||||
main_pub_drained: bool = True
|
||||
vision_pubs: List[str] = field(default_factory=list)
|
||||
ignore_alive_pubs: List[str] = field(default_factory=list)
|
||||
|
||||
|
||||
class DummySocket:
|
||||
@@ -143,6 +148,7 @@ def controlsd_fingerprint_callback(rc, pm, msgs, fingerprint):
|
||||
|
||||
|
||||
def get_car_params_callback(rc, pm, msgs, fingerprint):
|
||||
params = Params()
|
||||
if fingerprint:
|
||||
CarInterface, _, _ = interfaces[fingerprint]
|
||||
CP = CarInterface.get_non_essential_params(fingerprint)
|
||||
@@ -151,12 +157,15 @@ def get_car_params_callback(rc, pm, msgs, fingerprint):
|
||||
sendcan = DummySocket()
|
||||
|
||||
canmsgs = [msg for msg in msgs if msg.which() == "can"]
|
||||
assert len(canmsgs) != 0, "CAN messages are required for carParams initialization"
|
||||
has_cached_cp = params.get("CarParamsCache") is not None
|
||||
assert len(canmsgs) != 0, "CAN messages are required for fingerprinting"
|
||||
assert os.environ.get("SKIP_FW_QUERY", False) or has_cached_cp, "CarParamsCache is required for fingerprinting. Make sure to keep carParams msgs in the logs."
|
||||
|
||||
for m in canmsgs[:300]:
|
||||
can.send(m.as_builder().to_bytes())
|
||||
_, CP = get_car(can, sendcan, Params().get_bool("ExperimentalLongitudinalEnabled"))
|
||||
Params().put("CarParams", CP.to_bytes())
|
||||
params.put("CarParams", CP.to_bytes())
|
||||
return CP
|
||||
|
||||
|
||||
def controlsd_rcv_callback(msg, cfg, frame):
|
||||
@@ -173,10 +182,6 @@ def controlsd_rcv_callback(msg, cfg, frame):
|
||||
return len(socks) > 0
|
||||
|
||||
|
||||
def radar_rcv_callback(msg, cfg, frame):
|
||||
return msg.which() == "can"
|
||||
|
||||
|
||||
def calibration_rcv_callback(msg, cfg, frame):
|
||||
# calibrationd publishes 1 calibrationData every 5 cameraOdometry packets.
|
||||
# should_recv always true to increment frame
|
||||
@@ -188,6 +193,42 @@ def torqued_rcv_callback(msg, cfg, frame):
|
||||
return (frame - 1) == 0 or msg.which() == 'liveLocationKalman'
|
||||
|
||||
|
||||
def dmonitoringmodeld_rcv_callback(msg, cfg, frame):
|
||||
return msg.which() == "driverCameraState"
|
||||
|
||||
|
||||
class ModeldCameraSyncRcvCallback:
|
||||
def __init__(self):
|
||||
self.road_present = False
|
||||
self.wide_road_present = False
|
||||
self.is_dual_camera = True
|
||||
|
||||
def __call__(self, msg, cfg, frame):
|
||||
if msg.which() == "initData":
|
||||
self.is_dual_camera = msg.initData.deviceType in ["tici", "tizi"]
|
||||
elif msg.which() == "roadCameraState":
|
||||
self.road_present = True
|
||||
elif msg.which() == "wideRoadCameraState":
|
||||
self.wide_road_present = True
|
||||
|
||||
if self.road_present and self.wide_road_present:
|
||||
self.road_present, self.wide_road_present = False, False
|
||||
return True
|
||||
elif self.road_present and not self.is_dual_camera:
|
||||
self.road_present = False
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
class MessageBasedRcvCallback:
|
||||
def __init__(self, trigger_msg_type):
|
||||
self.trigger_msg_type = trigger_msg_type
|
||||
|
||||
def __call__(self, msg, cfg, frame):
|
||||
return msg.which() == self.trigger_msg_type
|
||||
|
||||
|
||||
class FrequencyBasedRcvCallback:
|
||||
def __init__(self, trigger_msg_type):
|
||||
self.trigger_msg_type = trigger_msg_type
|
||||
@@ -205,17 +246,17 @@ class FrequencyBasedRcvCallback:
|
||||
|
||||
def laikad_config_pubsub_callback(params, cfg):
|
||||
ublox = params.get_bool("UbloxAvailable")
|
||||
drained_key = "ubloxGnss" if ublox else "qcomGnss"
|
||||
main_key = "ubloxGnss" if ublox else "qcomGnss"
|
||||
sub_keys = ({"qcomGnss", } if ublox else {"ubloxGnss", })
|
||||
|
||||
return set(cfg.pubs) - sub_keys, drained_key
|
||||
return set(cfg.pubs) - sub_keys, main_key, True
|
||||
|
||||
|
||||
def locationd_config_pubsub_callback(params, cfg):
|
||||
ublox = params.get_bool("UbloxAvailable")
|
||||
sub_keys = ({"gpsLocation", } if ublox else {"gpsLocationExternal", })
|
||||
|
||||
return set(cfg.pubs) - sub_keys, None
|
||||
return set(cfg.pubs) - sub_keys, None, False
|
||||
|
||||
|
||||
CONFIGS = [
|
||||
@@ -229,29 +270,26 @@ CONFIGS = [
|
||||
],
|
||||
subs=["controlsState", "carState", "carControl", "sendcan", "carEvents", "carParams"],
|
||||
ignore=["logMonoTime", "valid", "controlsState.startMonoTime", "controlsState.cumLagMs"],
|
||||
config_callback=None,
|
||||
init_callback=controlsd_fingerprint_callback,
|
||||
should_recv_callback=controlsd_rcv_callback,
|
||||
tolerance=NUMPY_TOLERANCE,
|
||||
simulation=False,
|
||||
drained_pub="can",
|
||||
main_pub="can",
|
||||
),
|
||||
ProcessConfig(
|
||||
proc_name="radard",
|
||||
pubs=["can", "carState", "modelV2"],
|
||||
subs=["radarState", "liveTracks"],
|
||||
ignore=["logMonoTime", "valid", "radarState.cumLagMs"],
|
||||
config_callback=None,
|
||||
init_callback=get_car_params_callback,
|
||||
should_recv_callback=radar_rcv_callback,
|
||||
drained_pub="can",
|
||||
should_recv_callback=MessageBasedRcvCallback("can"),
|
||||
main_pub="can",
|
||||
),
|
||||
ProcessConfig(
|
||||
proc_name="plannerd",
|
||||
pubs=["modelV2", "carControl", "carState", "controlsState", "radarState"],
|
||||
subs=["lateralPlan", "longitudinalPlan", "uiPlan"],
|
||||
ignore=["logMonoTime", "valid", "longitudinalPlan.processingDelay", "longitudinalPlan.solverExecutionTime", "lateralPlan.solverExecutionTime"],
|
||||
config_callback=None,
|
||||
init_callback=get_car_params_callback,
|
||||
should_recv_callback=FrequencyBasedRcvCallback("modelV2"),
|
||||
tolerance=NUMPY_TOLERANCE,
|
||||
@@ -261,8 +299,6 @@ CONFIGS = [
|
||||
pubs=["carState", "cameraOdometry", "carParams"],
|
||||
subs=["liveCalibration"],
|
||||
ignore=["logMonoTime", "valid"],
|
||||
config_callback=None,
|
||||
init_callback=None,
|
||||
should_recv_callback=calibration_rcv_callback,
|
||||
),
|
||||
ProcessConfig(
|
||||
@@ -270,8 +306,6 @@ CONFIGS = [
|
||||
pubs=["driverStateV2", "liveCalibration", "carState", "modelV2", "controlsState"],
|
||||
subs=["driverMonitoringState"],
|
||||
ignore=["logMonoTime", "valid"],
|
||||
config_callback=None,
|
||||
init_callback=None,
|
||||
should_recv_callback=FrequencyBasedRcvCallback("driverStateV2"),
|
||||
tolerance=NUMPY_TOLERANCE,
|
||||
),
|
||||
@@ -284,8 +318,6 @@ CONFIGS = [
|
||||
subs=["liveLocationKalman"],
|
||||
ignore=["logMonoTime", "valid"],
|
||||
config_callback=locationd_config_pubsub_callback,
|
||||
init_callback=None,
|
||||
should_recv_callback=None,
|
||||
tolerance=NUMPY_TOLERANCE,
|
||||
),
|
||||
ProcessConfig(
|
||||
@@ -293,7 +325,6 @@ CONFIGS = [
|
||||
pubs=["liveLocationKalman", "carState"],
|
||||
subs=["liveParameters"],
|
||||
ignore=["logMonoTime", "valid"],
|
||||
config_callback=None,
|
||||
init_callback=get_car_params_callback,
|
||||
should_recv_callback=FrequencyBasedRcvCallback("liveLocationKalman"),
|
||||
tolerance=NUMPY_TOLERANCE,
|
||||
@@ -303,9 +334,6 @@ CONFIGS = [
|
||||
pubs=["ubloxRaw"],
|
||||
subs=["ubloxGnss", "gpsLocationExternal"],
|
||||
ignore=["logMonoTime"],
|
||||
config_callback=None,
|
||||
init_callback=None,
|
||||
should_recv_callback=None,
|
||||
),
|
||||
ProcessConfig(
|
||||
proc_name="laikad",
|
||||
@@ -313,22 +341,43 @@ CONFIGS = [
|
||||
subs=["gnssMeasurements"],
|
||||
ignore=["logMonoTime"],
|
||||
config_callback=laikad_config_pubsub_callback,
|
||||
init_callback=None,
|
||||
should_recv_callback=None,
|
||||
tolerance=NUMPY_TOLERANCE,
|
||||
timeout=60*10, # first messages are blocked on internet assistance
|
||||
drained_pub="ubloxGnss", # config_callback will switch this to qcom if needed
|
||||
main_pub="ubloxGnss", # config_callback will switch this to qcom if needed
|
||||
),
|
||||
ProcessConfig(
|
||||
proc_name="torqued",
|
||||
pubs=["liveLocationKalman", "carState", "carControl"],
|
||||
subs=["liveTorqueParameters"],
|
||||
ignore=["logMonoTime"],
|
||||
config_callback=None,
|
||||
init_callback=get_car_params_callback,
|
||||
should_recv_callback=torqued_rcv_callback,
|
||||
tolerance=NUMPY_TOLERANCE,
|
||||
),
|
||||
ProcessConfig(
|
||||
proc_name="modeld",
|
||||
pubs=["lateralPlan", "roadCameraState", "wideRoadCameraState", "liveCalibration", "driverMonitoringState"],
|
||||
subs=["modelV2", "cameraOdometry"],
|
||||
ignore=["logMonoTime", "modelV2.frameDropPerc", "modelV2.modelExecutionTime"],
|
||||
should_recv_callback=ModeldCameraSyncRcvCallback(),
|
||||
tolerance=NUMPY_TOLERANCE,
|
||||
main_pub=vipc_get_endpoint_name("camerad", meta_from_camera_state("roadCameraState").stream),
|
||||
main_pub_drained=False,
|
||||
vision_pubs=["roadCameraState", "wideRoadCameraState"],
|
||||
ignore_alive_pubs=["wideRoadCameraState"],
|
||||
),
|
||||
ProcessConfig(
|
||||
proc_name="dmonitoringmodeld",
|
||||
pubs=["liveCalibration", "driverCameraState"],
|
||||
subs=["driverStateV2"],
|
||||
ignore=["logMonoTime", "driverStateV2.modelExecutionTime", "driverStateV2.dspExecutionTime"],
|
||||
should_recv_callback=dmonitoringmodeld_rcv_callback,
|
||||
tolerance=NUMPY_TOLERANCE,
|
||||
main_pub=vipc_get_endpoint_name("camerad", meta_from_camera_state("driverCameraState").stream),
|
||||
main_pub_drained=False,
|
||||
vision_pubs=["driverCameraState"],
|
||||
ignore_alive_pubs=["driverCameraState"],
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
@@ -344,9 +393,9 @@ def replay_process_with_name(name, lr, *args, **kwargs):
|
||||
return replay_process(cfg, lr, *args, **kwargs)
|
||||
|
||||
|
||||
def replay_process(cfg, lr, fingerprint=None, return_all_logs=False, custom_params=None, disable_progress=False):
|
||||
all_msgs = migrate_all(lr, old_logtime=True)
|
||||
process_logs = _replay_single_process(cfg, all_msgs, fingerprint, custom_params or {}, disable_progress)
|
||||
def replay_process(cfg, lr, frs=None, fingerprint=None, return_all_logs=False, custom_params=None, disable_progress=False):
|
||||
all_msgs = migrate_all(lr, old_logtime=True, camera_states=len(cfg.vision_pubs) != 0)
|
||||
process_logs = _replay_single_process(cfg, all_msgs, frs, fingerprint, custom_params, disable_progress)
|
||||
|
||||
if return_all_logs:
|
||||
keys = set(cfg.subs)
|
||||
@@ -361,7 +410,7 @@ def replay_process(cfg, lr, fingerprint=None, return_all_logs=False, custom_para
|
||||
|
||||
|
||||
def _replay_single_process(
|
||||
cfg: ProcessConfig, lr: Union[LogReader, List[capnp._DynamicStructReader]],
|
||||
cfg: ProcessConfig, lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]],
|
||||
fingerprint: Optional[str], custom_params: Optional[Dict[str, Any]], disable_progress: bool
|
||||
):
|
||||
with OpenpilotPrefix():
|
||||
@@ -383,11 +432,11 @@ def _replay_single_process(
|
||||
else:
|
||||
CP = next((m.carParams for m in lr if m.which() == "carParams"), None)
|
||||
assert CP is not None or "carParams" not in cfg.pubs, "carParams are missing and process needs it"
|
||||
setup_env(CP=CP, cfg=cfg, controlsState=controlsState, lr=lr, custom_params=custom_params)
|
||||
setup_env(cfg=cfg, CP=CP, controlsState=controlsState, lr=lr, custom_params=custom_params)
|
||||
|
||||
if cfg.config_callback is not None:
|
||||
params = Params()
|
||||
cfg.pubs, cfg.drained_pub = cfg.config_callback(params, cfg)
|
||||
cfg.pubs, cfg.main_pub, cfg.main_pub_drained = cfg.config_callback(params, cfg)
|
||||
|
||||
all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime)
|
||||
pub_msgs = [msg for msg in all_msgs if msg.which() in set(cfg.pubs)]
|
||||
@@ -396,6 +445,13 @@ def _replay_single_process(
|
||||
pm = messaging.PubMaster(cfg.pubs)
|
||||
sockets = {s: messaging.sub_sock(s, timeout=100) for s in cfg.subs}
|
||||
|
||||
vipc_server = None
|
||||
if len(cfg.vision_pubs) != 0:
|
||||
assert frs is not None, "frs must be provided when replaying process using vision streams"
|
||||
assert all(meta_from_camera_state(st) is not None for st in cfg.vision_pubs),f"undefined vision stream spotted, probably misconfigured process: {cfg.vision_pubs}"
|
||||
assert all(st in frs for st in cfg.vision_pubs), f"frs for this process must contain following vision streams: {cfg.vision_pubs}"
|
||||
vipc_server = setup_vision_ipc(cfg, lr)
|
||||
|
||||
managed_processes[cfg.proc_name].prepare()
|
||||
managed_processes[cfg.proc_name].start()
|
||||
|
||||
@@ -406,7 +462,7 @@ def _replay_single_process(
|
||||
try:
|
||||
# Wait for process to startup
|
||||
with Timeout(10, error_msg=f"timed out waiting for process to start: {repr(cfg.proc_name)}"):
|
||||
while not all(pm.all_readers_updated(s) for s in cfg.pubs):
|
||||
while not all(pm.all_readers_updated(s) for s in cfg.pubs if s not in cfg.ignore_alive_pubs):
|
||||
time.sleep(0)
|
||||
|
||||
# Do the replay
|
||||
@@ -428,11 +484,19 @@ def _replay_single_process(
|
||||
|
||||
# empty recv on drained pub indicates the end of messages, only do that if there're any
|
||||
trigger_empty_recv = False
|
||||
if cfg.drained_pub:
|
||||
trigger_empty_recv = next((True for m in msg_queue if m.which() == cfg.drained_pub), False)
|
||||
if cfg.main_pub and cfg.main_pub_drained:
|
||||
trigger_empty_recv = next((True for m in msg_queue if m.which() == cfg.main_pub), False)
|
||||
|
||||
for m in msg_queue:
|
||||
pm.send(m.which(), m.as_builder())
|
||||
# send frames if needed
|
||||
if vipc_server is not None and m.which() in cfg.vision_pubs:
|
||||
camera_state = getattr(m, m.which())
|
||||
camera_meta = meta_from_camera_state(m.which())
|
||||
assert frs is not None
|
||||
img = frs[m.which()].get(camera_state.frameId, pix_fmt="nv12")[0]
|
||||
vipc_server.send(camera_meta.stream, img.flatten().tobytes(),
|
||||
camera_state.frameId, camera_state.timestampSof, camera_state.timestampEof)
|
||||
msg_queue = []
|
||||
|
||||
rc.unlock_sockets()
|
||||
@@ -454,6 +518,21 @@ def _replay_single_process(
|
||||
return log_msgs
|
||||
|
||||
|
||||
def setup_vision_ipc(cfg, lr):
|
||||
assert len(cfg.vision_pubs) != 0
|
||||
|
||||
device_type = next(msg.initData.deviceType for msg in lr if msg.which() == "initData")
|
||||
|
||||
vipc_server = VisionIpcServer("camerad")
|
||||
streams_metas = available_streams(lr)
|
||||
for meta in streams_metas:
|
||||
if meta.camera_state in cfg.vision_pubs:
|
||||
vipc_server.create_buffers(meta.stream, 2, False, *meta.frame_sizes[device_type])
|
||||
vipc_server.start_listener()
|
||||
|
||||
return vipc_server
|
||||
|
||||
|
||||
def setup_env(cfg=None, CP=None, controlsState=None, lr=None, fingerprint=None, custom_params=None, log_dir=None):
|
||||
if platform.system() != "Darwin":
|
||||
os.environ["PARAMS_ROOT"] = "/dev/shm/params"
|
||||
|
||||
@@ -65,6 +65,7 @@ excluded_interfaces = ["mock", "mazda", "tesla"]
|
||||
|
||||
BASE_URL = "https://commadataci.blob.core.windows.net/openpilotci/"
|
||||
REF_COMMIT_FN = os.path.join(PROC_REPLAY_DIR, "ref_commit")
|
||||
EXCLUDED_PROCS = {"modeld", "dmonitoringmodeld"}
|
||||
|
||||
|
||||
def run_test_process(data):
|
||||
@@ -155,7 +156,7 @@ def format_diff(results, log_paths, ref_commit):
|
||||
|
||||
if __name__ == "__main__":
|
||||
all_cars = {car for car, _ in segments}
|
||||
all_procs = {cfg.proc_name for cfg in CONFIGS}
|
||||
all_procs = {cfg.proc_name for cfg in CONFIGS if cfg.proc_name not in EXCLUDED_PROCS}
|
||||
|
||||
parser = argparse.ArgumentParser(description="Regression test to identify changes in a process's output")
|
||||
parser.add_argument("--whitelist-procs", type=str, nargs="*", default=all_procs,
|
||||
|
||||
43
selfdrive/test/process_replay/vision_meta.py
Normal file
43
selfdrive/test/process_replay/vision_meta.py
Normal file
@@ -0,0 +1,43 @@
|
||||
from collections import namedtuple
|
||||
from cereal.visionipc import VisionStreamType
|
||||
from common.realtime import DT_MDL, DT_DMON
|
||||
from common.transformations.camera import tici_f_frame_size, tici_d_frame_size, tici_e_frame_size, eon_f_frame_size, eon_d_frame_size
|
||||
|
||||
VideoStreamMeta = namedtuple("VideoStreamMeta", ["camera_state", "encode_index", "stream", "dt", "frame_sizes"])
|
||||
ROAD_CAMERA_FRAME_SIZES = {"tici": tici_f_frame_size, "tizi": tici_f_frame_size, "eon": eon_f_frame_size}
|
||||
WIDE_ROAD_CAMERA_FRAME_SIZES = {"tici": tici_e_frame_size, "tizi": tici_e_frame_size}
|
||||
DRIVER_FRAME_SIZES = {"tici": tici_d_frame_size, "tizi": tici_d_frame_size, "eon": eon_d_frame_size}
|
||||
VIPC_STREAM_METADATA = [
|
||||
# metadata: (state_msg_type, encode_msg_type, stream_type, dt, frame_sizes)
|
||||
("roadCameraState", "roadEncodeIdx", VisionStreamType.VISION_STREAM_ROAD, DT_MDL, ROAD_CAMERA_FRAME_SIZES),
|
||||
("wideRoadCameraState", "wideRoadEncodeIdx", VisionStreamType.VISION_STREAM_WIDE_ROAD, DT_MDL, WIDE_ROAD_CAMERA_FRAME_SIZES),
|
||||
("driverCameraState", "driverEncodeIdx", VisionStreamType.VISION_STREAM_DRIVER, DT_DMON, DRIVER_FRAME_SIZES),
|
||||
]
|
||||
|
||||
|
||||
def meta_from_camera_state(state):
|
||||
meta = next((VideoStreamMeta(*meta) for meta in VIPC_STREAM_METADATA if meta[0] == state), None)
|
||||
return meta
|
||||
|
||||
|
||||
def meta_from_encode_index(encode_index):
|
||||
meta = next((VideoStreamMeta(*meta) for meta in VIPC_STREAM_METADATA if meta[1] == encode_index), None)
|
||||
return meta
|
||||
|
||||
|
||||
def meta_from_stream_type(stream_type):
|
||||
meta = next((VideoStreamMeta(*meta) for meta in VIPC_STREAM_METADATA if meta[2] == stream_type), None)
|
||||
return meta
|
||||
|
||||
|
||||
def available_streams(lr=None):
|
||||
if lr is None:
|
||||
return [VideoStreamMeta(*meta) for meta in VIPC_STREAM_METADATA]
|
||||
|
||||
result = []
|
||||
for meta in VIPC_STREAM_METADATA:
|
||||
has_cam_state = next((True for m in lr if m.which() == meta[0]), False)
|
||||
if has_cam_state:
|
||||
result.append(VideoStreamMeta(*meta))
|
||||
|
||||
return result
|
||||
Reference in New Issue
Block a user