diff --git a/.gitignore b/.gitignore
index 6c6810db..d66bd7dc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -10,6 +10,7 @@
.sconsign.dblite
.hypothesis
*.egg-info/
+*.html
uv.lock
opendbc/can/*.so
diff --git a/examples/longitudinal-profiles.py b/examples/longitudinal-profiles.py
new file mode 100755
index 00000000..2e733744
--- /dev/null
+++ b/examples/longitudinal-profiles.py
@@ -0,0 +1,156 @@
+#!/usr/bin/env python3
+import io
+import time
+import base64
+import argparse
+import numpy as np
+import matplotlib.pyplot as plt
+from collections import defaultdict
+from dataclasses import dataclass, asdict
+from pathlib import Path
+
+from opendbc.car.structs import CarControl
+from opendbc.car.panda_runner import PandaRunner
+
+DT = 0.01 # step time (s)
+
+@dataclass
+class Action:
+ accel: float # m/s^2
+ duration: float # seconds
+ longControlState: CarControl.Actuators.LongControlState = CarControl.Actuators.LongControlState.pid
+
+ def get_msgs(self):
+ return [
+ (t, CarControl(
+ enabled=True,
+ longActive=True,
+ actuators=CarControl.Actuators(
+ accel=self.accel,
+ longControlState=self.longControlState,
+ ),
+ ))
+ for t in np.linspace(0, self.duration, int(self.duration/DT))
+ ]
+
+@dataclass
+class Maneuver:
+ description: str
+ actions: list[Action]
+
+ def get_msgs(self):
+ t0 = 0
+ for action in self.actions:
+ for lt, msg in action.get_msgs():
+ yield lt + t0, msg
+ t0 += lt
+
+MANEUVERS = [
+ Maneuver(
+ "creeping: alternate between +1m/ss and -1m/ss",
+ [
+ Action(1, 2), Action(-1, 2),
+ Action(1, 2), Action(-1, 2),
+ Action(1, 2), Action(-1, 2),
+ ],
+ ),
+]
+
+def main(args):
+ with PandaRunner() as p:
+ print("\n\n")
+
+ logs = {}
+ for i, m in enumerate(MANEUVERS):
+ print(f"Running {i+1}/{len(MANEUVERS)} '{m.description}'")
+
+ print("- setting up")
+ good_cnt = 0
+ for _ in range(int(30./DT)):
+ cs = p.read(strict=False)
+
+ cc = CarControl(
+ enabled=True,
+ longActive=True,
+ actuators=CarControl.Actuators(accel=-1.5, longControlState=CarControl.Actuators.LongControlState.stopping),
+ )
+ p.write(cc)
+
+ good_cnt = (good_cnt+1) if cs.vEgo < 0.1 and cs.cruiseState.enabled and not cs.cruiseState.standstill else 0
+ if good_cnt > (2./DT):
+ break
+ time.sleep(DT)
+ else:
+ print("ERROR: failed to setup")
+ continue
+
+ print("- executing maneuver")
+ logs[m.description] = defaultdict(list)
+ for t, cc in m.get_msgs():
+ cs = p.read()
+ p.write(cc)
+
+ logs[m.description]["t"].append(t)
+ to_log = {"carControl": cc, "carState": cs, "carControl.actuators": cc.actuators,
+ "carControl.cruiseControl": cc.cruiseControl, "carState.cruiseState": cs.cruiseState}
+ for k, v in to_log.items():
+ for k2, v2 in asdict(v).items():
+ logs[m.description][f"{k}.{k2}"].append(v2)
+
+ time.sleep(DT)
+
+ # ***** write out report *****
+
+ output_path = Path(__file__).resolve().parent / "longitudinal_reports"
+ output_fn = args.output or output_path / f"{p.CI.CP.carFingerprint}_{time.strftime('%Y%m%d-%H_%M_%S')}.html"
+ output_path.mkdir(exist_ok=True)
+ with open(output_fn, "w") as f:
+ f.write("
Longitudinal maneuver report
\n")
+ f.write(f"{p.CI.CP.carFingerprint}
\n")
+ if args.desc:
+ f.write(f"{args.desc}
")
+ for m in MANEUVERS:
+ f.write("\n")
+ f.write(f"{m.description}
\n")
+
+ log = logs[m.description]
+
+ plt.rcParams['font.size'] = 40
+ fig = plt.figure(figsize=(30, 20))
+ ax = fig.subplots(3, 1, sharex=True, gridspec_kw={'hspace': 0, 'height_ratios': [5, 1, 1]})
+
+ ax[0].grid(linewidth=4)
+ ax[0].plot(log["t"], log["carState.aEgo"], label='aEgo', linewidth=6)
+ ax[0].plot(log["t"], log["carControl.actuators.accel"], label='accel command', linewidth=6)
+ ax[0].set_ylabel('Acceleration (m/s^2)')
+ ax[0].set_ylim(-4.5, 4.5)
+ ax[0].legend()
+
+ ax[1].plot(log["t"], log["carControl.enabled"], label='enabled', linewidth=6)
+ ax[2].plot(log["t"], log["carState.gasPressed"], label='gasPressed', linewidth=6)
+ for i in (1, 2):
+ ax[i].set_yticks([0, 1], minor=False)
+ ax[i].set_ylim(-1, 2)
+ ax[i].legend()
+
+ ax[-1].set_xlabel("Time (s)")
+ fig.tight_layout()
+
+ buffer = io.BytesIO()
+ fig.savefig(buffer, format='png')
+ buffer.seek(0)
+ f.write(f"\n")
+
+ print(f"\nReport written to {output_fn}\n")
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(description="A tool for longitudinal control testing.",
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ parser.add_argument('--desc', help="Extra description to include in report.")
+ parser.add_argument('--output', help="Write out report to this file.", default=None)
+ args = parser.parse_args()
+
+ assert args.output is None or args.output.endswith(".html"), "Output filename must end with '.html'"
+
+ main(args)
\ No newline at end of file
diff --git a/opendbc/car/panda_runner.py b/opendbc/car/panda_runner.py
index 02fd3ff2..2e6221de 100644
--- a/opendbc/car/panda_runner.py
+++ b/opendbc/car/panda_runner.py
@@ -1,35 +1,57 @@
-from contextlib import contextmanager
+import time
+from contextlib import AbstractContextManager
from panda import Panda
from opendbc.car.car_helpers import get_car
from opendbc.car.can_definitions import CanData
+from opendbc.car.structs import CarControl
-@contextmanager
-def PandaRunner():
- p = Panda()
+class PandaRunner(AbstractContextManager):
+ def __enter__(self):
+ self.p = Panda()
+ self.p.reset()
- def _can_recv(wait_for_one: bool = False) -> list[list[CanData]]:
- recv = p.can_recv()
+ # setup + fingerprinting
+ self.p.set_safety_mode(Panda.SAFETY_ELM327, 1)
+ self.CI = get_car(self._can_recv, self.p.can_send_many, self.p.set_obd, True)
+ print("fingerprinted", self.CI.CP.carName)
+ assert self.CI.CP.carFingerprint != "mock", "Unable to identify car. Check connections and ensure car is supported."
+
+ self.p.set_safety_mode(Panda.SAFETY_ELM327, 1)
+ self.CI.init(self.CI.CP, self._can_recv, self.p.can_send_many)
+ self.p.set_safety_mode(Panda.SAFETY_TOYOTA, self.CI.CP.safetyConfigs[0].safetyParam)
+
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.p.set_safety_mode(Panda.SAFETY_NOOUTPUT)
+ self.p.reset() # avoid siren
+ return super().__exit__(exc_type, exc_value, traceback)
+
+ @property
+ def panda(self) -> Panda:
+ return self.p
+
+ def _can_recv(self, wait_for_one: bool = False) -> list[list[CanData]]:
+ recv = self.p.can_recv()
while len(recv) == 0 and wait_for_one:
- recv = p.can_recv()
+ recv = self.p.can_recv()
return [[CanData(addr, dat, bus) for addr, dat, bus in recv], ]
- try:
- # setup + fingerprinting
- p.set_safety_mode(Panda.SAFETY_ELM327, 1)
- CI = get_car(_can_recv, p.can_send_many, p.set_obd, True)
- print("fingerprinted", CI.CP.carName)
- assert CI.CP.carFingerprint != "mock", "Unable to identify car. Check connections and ensure car is supported."
-
- p.set_safety_mode(Panda.SAFETY_ELM327, 1)
- CI.init(CI.CP, _can_recv, p.can_send_many)
- p.set_safety_mode(Panda.SAFETY_TOYOTA, CI.CP.safetyConfigs[0].safetyParam)
-
- yield p, CI
- finally:
- p.set_safety_mode(Panda.SAFETY_NOOUTPUT)
+ def read(self, strict: bool = True):
+ cs = self.CI.update([int(time.monotonic()*1e9), self._can_recv()[0]])
+ if strict:
+ assert cs.canValid, "CAN went invalid, check connections"
+ return cs
+ def write(self, cc: CarControl) -> None:
+ if cc.enabled and not self.p.health()['controls_allowed']:
+ # prevent the car from faulting. print a warning?
+ cc = CarControl(enabled=False)
+ _, can_sends = self.CI.apply(cc)
+ self.p.can_send_many(can_sends, timeout=25)
+ self.p.send_heartbeat()
if __name__ == "__main__":
- with PandaRunner() as (p, CI):
- print(p.can_recv())
\ No newline at end of file
+ with PandaRunner() as p:
+ print(p.read())
diff --git a/pyproject.toml b/pyproject.toml
index 494635e7..945ee631 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -35,6 +35,7 @@ docs = [
]
examples = [
"inputs",
+ "matplotlib",
]
[tool.pytest.ini_options]