mirror of
https://github.com/sunnypilot/sunnypilot.git
synced 2026-02-19 07:43:57 +08:00
process_replay: capture process output (#29027)
* Add ProcessOutputProxy * Move launcher to its own field * Move ProcessOutputCapture to its own file * Return itself from __enter__ of OpenpilotPrefix * Integrate ProcessOutputCapture into process_replay * Add note about capture_output_store to README * ipykernel import is optional * Disable type checking for link_with_current_proc * Remove assertion * Decode outputs to utf-8 * read(self): return empty buf if its none * Fix type annotations * Replace fifo with regular file, to avoid hitting fifo size limit
This commit is contained in:
@@ -196,6 +196,7 @@ class NativeProcess(ManagerProcess):
|
||||
self.unkillable = unkillable
|
||||
self.sigkill = sigkill
|
||||
self.watchdog_max_dt = watchdog_max_dt
|
||||
self.launcher = nativelauncher
|
||||
|
||||
def prepare(self) -> None:
|
||||
pass
|
||||
@@ -210,7 +211,7 @@ class NativeProcess(ManagerProcess):
|
||||
|
||||
cwd = os.path.join(BASEDIR, self.cwd)
|
||||
cloudlog.info(f"starting process {self.name}")
|
||||
self.proc = Process(name=self.name, target=nativelauncher, args=(self.cmdline, cwd, self.name))
|
||||
self.proc = Process(name=self.name, target=self.launcher, args=(self.cmdline, cwd, self.name))
|
||||
self.proc.start()
|
||||
self.watchdog_seen = False
|
||||
self.shutting_down = False
|
||||
@@ -227,6 +228,7 @@ class PythonProcess(ManagerProcess):
|
||||
self.unkillable = unkillable
|
||||
self.sigkill = sigkill
|
||||
self.watchdog_max_dt = watchdog_max_dt
|
||||
self.launcher = launcher
|
||||
|
||||
def prepare(self) -> None:
|
||||
if self.enabled:
|
||||
@@ -242,7 +244,7 @@ class PythonProcess(ManagerProcess):
|
||||
return
|
||||
|
||||
cloudlog.info(f"starting python {self.module}")
|
||||
self.proc = Process(name=self.name, target=launcher, args=(self.module, self.name))
|
||||
self.proc = Process(name=self.name, target=self.launcher, args=(self.module, self.name))
|
||||
self.proc.start()
|
||||
self.watchdog_seen = False
|
||||
self.shutting_down = False
|
||||
|
||||
@@ -113,4 +113,16 @@ frs = {
|
||||
}
|
||||
|
||||
output_logs = replay_process_with_name(['modeld', 'dmonitoringmodeld'], lr, frs=frs)
|
||||
```
|
||||
|
||||
To capture stdout/stderr of the replayed process, `captured_output_store` can be provided.
|
||||
|
||||
```py
|
||||
output_store = dict()
|
||||
# pass dictionary by reference, it will be filled with standard outputs - even if process replay fails
|
||||
output_logs = replay_process_with_name(['radard', 'plannerd'], lr, captured_output_store=output_store)
|
||||
|
||||
# entries with captured output in format { 'out': '...', 'err': '...' } will be added to provided dictionary for each replayed process
|
||||
print(output_store['radard']['out']) # radard stdout
|
||||
print(output_store['radard']['err']) # radard stderr
|
||||
```
|
||||
59
selfdrive/test/process_replay/capture.py
Normal file
59
selfdrive/test/process_replay/capture.py
Normal file
@@ -0,0 +1,59 @@
|
||||
import os
|
||||
import sys
|
||||
|
||||
from typing import Tuple, no_type_check
|
||||
|
||||
class FdRedirect:
|
||||
def __init__(self, file_prefix: str, fd: int):
|
||||
fname = os.path.join("/tmp", f"{file_prefix}.{fd}")
|
||||
if os.path.exists(fname):
|
||||
os.unlink(fname)
|
||||
self.dest_fd = os.open(fname, os.O_WRONLY | os.O_CREAT)
|
||||
self.dest_fname = fname
|
||||
self.source_fd = fd
|
||||
os.set_inheritable(self.dest_fd, True)
|
||||
|
||||
def __del__(self):
|
||||
os.close(self.dest_fd)
|
||||
|
||||
def purge(self) -> None:
|
||||
os.unlink(self.dest_fname)
|
||||
|
||||
def read(self) -> bytes:
|
||||
with open(self.dest_fname, "rb") as f:
|
||||
return f.read() or b""
|
||||
|
||||
def link(self) -> None:
|
||||
os.dup2(self.dest_fd, self.source_fd)
|
||||
|
||||
|
||||
class ProcessOutputCapture:
|
||||
def __init__(self, proc_name: str, prefix: str):
|
||||
prefix = f"{proc_name}_{prefix}"
|
||||
self.stdout_redirect = FdRedirect(prefix, 1)
|
||||
self.stderr_redirect = FdRedirect(prefix, 2)
|
||||
|
||||
def __del__(self):
|
||||
self.stdout_redirect.purge()
|
||||
self.stderr_redirect.purge()
|
||||
|
||||
@no_type_check # ipython classes have incompatible signatures
|
||||
def link_with_current_proc(self) -> None:
|
||||
try:
|
||||
# prevent ipykernel from redirecting stdout/stderr of python subprocesses
|
||||
from ipykernel.iostream import OutStream
|
||||
if isinstance(sys.stdout, OutStream):
|
||||
sys.stdout = sys.__stdout__
|
||||
if isinstance(sys.stderr, OutStream):
|
||||
sys.stderr = sys.__stderr__
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
# link stdout/stderr to the fifo
|
||||
self.stdout_redirect.link()
|
||||
self.stderr_redirect.link()
|
||||
|
||||
def read_outerr(self) -> Tuple[str, str]:
|
||||
out_str = self.stdout_redirect.read().decode()
|
||||
err_str = self.stderr_redirect.read().decode()
|
||||
return out_str, err_str
|
||||
@@ -19,6 +19,8 @@ class OpenpilotPrefix(object):
|
||||
except FileExistsError:
|
||||
pass
|
||||
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_obj, exc_tb):
|
||||
if self.clean_dirs_on_exit:
|
||||
self.clean_dirs()
|
||||
|
||||
@@ -25,6 +25,7 @@ from selfdrive.manager.process_config import managed_processes
|
||||
from selfdrive.test.process_replay.helpers import OpenpilotPrefix, DummySocket
|
||||
from selfdrive.test.process_replay.vision_meta import meta_from_camera_state, available_streams
|
||||
from selfdrive.test.process_replay.migration import migrate_all
|
||||
from selfdrive.test.process_replay.capture import ProcessOutputCapture
|
||||
from tools.lib.logreader import LogReader
|
||||
|
||||
# Numpy gives different results based on CPU features after version 19
|
||||
@@ -33,6 +34,16 @@ PROC_REPLAY_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
FAKEDATA = os.path.join(PROC_REPLAY_DIR, "fakedata/")
|
||||
|
||||
|
||||
class LauncherWithCapture:
|
||||
def __init__(self, capture: ProcessOutputCapture, launcher: Callable):
|
||||
self.capture = capture
|
||||
self.launcher = launcher
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
self.capture.link_with_current_proc()
|
||||
self.launcher(*args, **kwargs)
|
||||
|
||||
|
||||
class ReplayContext:
|
||||
def __init__(self, cfg):
|
||||
self.proc_name = cfg.proc_name
|
||||
@@ -126,13 +137,14 @@ class ProcessContainer:
|
||||
def __init__(self, cfg: ProcessConfig):
|
||||
self.prefix = OpenpilotPrefix(clean_dirs_on_exit=False)
|
||||
self.cfg = copy.deepcopy(cfg)
|
||||
self.process = managed_processes[cfg.proc_name]
|
||||
self.process = copy.deepcopy(managed_processes[cfg.proc_name])
|
||||
self.msg_queue: List[capnp._DynamicStructReader] = []
|
||||
self.cnt = 0
|
||||
self.pm: Optional[messaging.PubMaster] = None
|
||||
self.sockets: Optional[List[messaging.SubSocket]] = None
|
||||
self.rc: Optional[ReplayContext] = None
|
||||
self.vipc_server: Optional[VisionIpcServer] = None
|
||||
self.capture: Optional[ProcessOutputCapture] = None
|
||||
|
||||
@property
|
||||
def has_empty_queue(self) -> bool:
|
||||
@@ -180,11 +192,18 @@ class ProcessContainer:
|
||||
|
||||
self.vipc_server = vipc_server
|
||||
|
||||
def _start_process(self):
|
||||
if self.capture is not None:
|
||||
self.process.launcher = LauncherWithCapture(self.capture, self.process.launcher)
|
||||
self.process.prepare()
|
||||
self.process.start()
|
||||
|
||||
def start(
|
||||
self, params_config: Dict[str, Any], environ_config: Dict[str, Any],
|
||||
all_msgs: Union[LogReader, List[capnp._DynamicStructReader]], fingerprint: Optional[str]
|
||||
self, params_config: Dict[str, Any], environ_config: Dict[str, Any],
|
||||
all_msgs: Union[LogReader, List[capnp._DynamicStructReader]],
|
||||
fingerprint: Optional[str], capture_output: bool
|
||||
):
|
||||
with self.prefix:
|
||||
with self.prefix as p:
|
||||
self._setup_env(params_config, environ_config)
|
||||
|
||||
if self.cfg.config_callback is not None:
|
||||
@@ -201,8 +220,10 @@ class ProcessContainer:
|
||||
self._setup_vision_ipc(all_msgs)
|
||||
assert self.vipc_server is not None
|
||||
|
||||
self.process.prepare()
|
||||
self.process.start()
|
||||
if capture_output:
|
||||
self.capture = ProcessOutputCapture(self.cfg.proc_name, p.prefix)
|
||||
|
||||
self._start_process()
|
||||
|
||||
if self.cfg.init_callback is not None:
|
||||
self.cfg.init_callback(self.rc, self.pm, all_msgs, fingerprint)
|
||||
@@ -598,7 +619,8 @@ def replay_process_with_name(name: Union[str, Iterable[str]], lr: Union[LogReade
|
||||
|
||||
def replay_process(
|
||||
cfg: Union[ProcessConfig, Iterable[ProcessConfig]], lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]] = None,
|
||||
fingerprint: Optional[str] = None, return_all_logs: bool = False, custom_params: Optional[Dict[str, Any]] = None, disable_progress: bool = False
|
||||
fingerprint: Optional[str] = None, return_all_logs: bool = False, custom_params: Optional[Dict[str, Any]] = None,
|
||||
captured_output_store: Optional[Dict[str, Dict[str, str]]] = None, disable_progress: bool = False
|
||||
) -> List[capnp._DynamicStructReader]:
|
||||
if isinstance(cfg, Iterable):
|
||||
cfgs = list(cfg)
|
||||
@@ -606,7 +628,7 @@ def replay_process(
|
||||
cfgs = [cfg]
|
||||
|
||||
all_msgs = migrate_all(lr, old_logtime=True, camera_states=any(len(cfg.vision_pubs) != 0 for cfg in cfgs))
|
||||
process_logs = _replay_multi_process(cfgs, all_msgs, frs, fingerprint, custom_params, disable_progress)
|
||||
process_logs = _replay_multi_process(cfgs, all_msgs, frs, fingerprint, custom_params, captured_output_store, disable_progress)
|
||||
|
||||
if return_all_logs:
|
||||
keys = {m.which() for m in process_logs}
|
||||
@@ -621,8 +643,8 @@ def replay_process(
|
||||
|
||||
|
||||
def _replay_multi_process(
|
||||
cfgs: List[ProcessConfig], lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]],
|
||||
fingerprint: Optional[str], custom_params: Optional[Dict[str, Any]], disable_progress: bool
|
||||
cfgs: List[ProcessConfig], lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]], fingerprint: Optional[str],
|
||||
custom_params: Optional[Dict[str, Any]], captured_output_store: Optional[Dict[str, Dict[str, str]]], disable_progress: bool
|
||||
) -> List[capnp._DynamicStructReader]:
|
||||
if fingerprint is not None:
|
||||
params_config = generate_params_config(lr=lr, fingerprint=fingerprint, custom_params=custom_params)
|
||||
@@ -647,7 +669,7 @@ def _replay_multi_process(
|
||||
containers = []
|
||||
for cfg in cfgs:
|
||||
container = ProcessContainer(cfg)
|
||||
container.start(params_config, env_config, all_msgs, fingerprint)
|
||||
container.start(params_config, env_config, all_msgs, fingerprint, captured_output_store is not None)
|
||||
containers.append(container)
|
||||
|
||||
all_pubs = set([pub for container in containers for pub in container.pubs])
|
||||
@@ -682,6 +704,10 @@ def _replay_multi_process(
|
||||
finally:
|
||||
for container in containers:
|
||||
container.stop()
|
||||
if captured_output_store is not None:
|
||||
assert container.capture is not None
|
||||
out, err = container.capture.read_outerr()
|
||||
captured_output_store[container.cfg.proc_name] = {"out": out, "err": err}
|
||||
|
||||
return log_msgs
|
||||
|
||||
|
||||
Reference in New Issue
Block a user