Files
sunnypilot/selfdrive/test/process_replay/helpers.py
Kacper Rączy 547a033a3c process_replay: capture process output (#29027)
* Add ProcessOutputProxy

* Move launcher to its own field

* Move ProcessOutputCapture to its own file

* Return itself from __enter__ of OpenpilotPrefix

* Integrate ProcessOutputCapture into process_replay

* Add note about capture_output_store to README

* ipykernel import is optional

* Disable type checking for link_with_current_proc

* Remove assertion

* Decode outputs to utf-8

* read(self): return empty buf if its none

* Fix type annotations

* Replace fifo with regular file, to avoid hitting fifo size limit
2023-07-31 16:30:58 -07:00

50 lines
1.2 KiB
Python

import os
import shutil
import uuid
from typing import List, Optional
from common.params import Params
class OpenpilotPrefix(object):
def __init__(self, prefix: Optional[str] = None, clean_dirs_on_exit: bool = True):
self.prefix = prefix if prefix else str(uuid.uuid4())
self.msgq_path = os.path.join('/dev/shm', self.prefix)
self.clean_dirs_on_exit = clean_dirs_on_exit
def __enter__(self):
os.environ['OPENPILOT_PREFIX'] = self.prefix
try:
os.mkdir(self.msgq_path)
except FileExistsError:
pass
return self
def __exit__(self, exc_type, exc_obj, exc_tb):
if self.clean_dirs_on_exit:
self.clean_dirs()
del os.environ['OPENPILOT_PREFIX']
return False
def clean_dirs(self):
symlink_path = Params().get_param_path()
if os.path.exists(symlink_path):
shutil.rmtree(os.path.realpath(symlink_path), ignore_errors=True)
os.remove(symlink_path)
shutil.rmtree(self.msgq_path, ignore_errors=True)
class DummySocket:
def __init__(self):
self.data: List[bytes] = []
def receive(self, non_blocking: bool = False) -> Optional[bytes]:
if non_blocking:
return None
return self.data.pop()
def send(self, data: bytes):
self.data.append(data)