longitudinal profile runner (#1197)

* long profiles

* start with creep

* lil cleanup

* corolla updates

* cleanup

* 2s

* plot is a little nicer

* strict mode

* cleanup

* unused

* fix that

---------

Co-authored-by: Comma Device <device@comma.ai>
This commit is contained in:
Adeeb Shihadeh 2024-08-30 13:48:13 -07:00 committed by GitHub
parent cbad7f0066
commit f2fa755ab7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 203 additions and 23 deletions

1
.gitignore vendored
View File

@ -10,6 +10,7 @@
.sconsign.dblite
.hypothesis
*.egg-info/
*.html
uv.lock
opendbc/can/*.so

156
examples/longitudinal-profiles.py Executable file
View File

@ -0,0 +1,156 @@
#!/usr/bin/env python3
import io
import time
import base64
import argparse
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict
from dataclasses import dataclass, asdict
from pathlib import Path
from opendbc.car.structs import CarControl
from opendbc.car.panda_runner import PandaRunner
DT = 0.01 # step time (s)
@dataclass
class Action:
accel: float # m/s^2
duration: float # seconds
longControlState: CarControl.Actuators.LongControlState = CarControl.Actuators.LongControlState.pid
def get_msgs(self):
return [
(t, CarControl(
enabled=True,
longActive=True,
actuators=CarControl.Actuators(
accel=self.accel,
longControlState=self.longControlState,
),
))
for t in np.linspace(0, self.duration, int(self.duration/DT))
]
@dataclass
class Maneuver:
description: str
actions: list[Action]
def get_msgs(self):
t0 = 0
for action in self.actions:
for lt, msg in action.get_msgs():
yield lt + t0, msg
t0 += lt
MANEUVERS = [
Maneuver(
"creeping: alternate between +1m/ss and -1m/ss",
[
Action(1, 2), Action(-1, 2),
Action(1, 2), Action(-1, 2),
Action(1, 2), Action(-1, 2),
],
),
]
def main(args):
with PandaRunner() as p:
print("\n\n")
logs = {}
for i, m in enumerate(MANEUVERS):
print(f"Running {i+1}/{len(MANEUVERS)} '{m.description}'")
print("- setting up")
good_cnt = 0
for _ in range(int(30./DT)):
cs = p.read(strict=False)
cc = CarControl(
enabled=True,
longActive=True,
actuators=CarControl.Actuators(accel=-1.5, longControlState=CarControl.Actuators.LongControlState.stopping),
)
p.write(cc)
good_cnt = (good_cnt+1) if cs.vEgo < 0.1 and cs.cruiseState.enabled and not cs.cruiseState.standstill else 0
if good_cnt > (2./DT):
break
time.sleep(DT)
else:
print("ERROR: failed to setup")
continue
print("- executing maneuver")
logs[m.description] = defaultdict(list)
for t, cc in m.get_msgs():
cs = p.read()
p.write(cc)
logs[m.description]["t"].append(t)
to_log = {"carControl": cc, "carState": cs, "carControl.actuators": cc.actuators,
"carControl.cruiseControl": cc.cruiseControl, "carState.cruiseState": cs.cruiseState}
for k, v in to_log.items():
for k2, v2 in asdict(v).items():
logs[m.description][f"{k}.{k2}"].append(v2)
time.sleep(DT)
# ***** write out report *****
output_path = Path(__file__).resolve().parent / "longitudinal_reports"
output_fn = args.output or output_path / f"{p.CI.CP.carFingerprint}_{time.strftime('%Y%m%d-%H_%M_%S')}.html"
output_path.mkdir(exist_ok=True)
with open(output_fn, "w") as f:
f.write("<h1>Longitudinal maneuver report</h1>\n")
f.write(f"<h3>{p.CI.CP.carFingerprint}</h3>\n")
if args.desc:
f.write(f"<h3>{args.desc}</h3>")
for m in MANEUVERS:
f.write("<div style='border-top: 1px solid #000; margin: 20px 0;'></div>\n")
f.write(f"<h2>{m.description}</h2>\n")
log = logs[m.description]
plt.rcParams['font.size'] = 40
fig = plt.figure(figsize=(30, 20))
ax = fig.subplots(3, 1, sharex=True, gridspec_kw={'hspace': 0, 'height_ratios': [5, 1, 1]})
ax[0].grid(linewidth=4)
ax[0].plot(log["t"], log["carState.aEgo"], label='aEgo', linewidth=6)
ax[0].plot(log["t"], log["carControl.actuators.accel"], label='accel command', linewidth=6)
ax[0].set_ylabel('Acceleration (m/s^2)')
ax[0].set_ylim(-4.5, 4.5)
ax[0].legend()
ax[1].plot(log["t"], log["carControl.enabled"], label='enabled', linewidth=6)
ax[2].plot(log["t"], log["carState.gasPressed"], label='gasPressed', linewidth=6)
for i in (1, 2):
ax[i].set_yticks([0, 1], minor=False)
ax[i].set_ylim(-1, 2)
ax[i].legend()
ax[-1].set_xlabel("Time (s)")
fig.tight_layout()
buffer = io.BytesIO()
fig.savefig(buffer, format='png')
buffer.seek(0)
f.write(f"<img src='data:image/png;base64,{base64.b64encode(buffer.getvalue()).decode()}' style='width:100%; max-width:800px;'>\n")
print(f"\nReport written to {output_fn}\n")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="A tool for longitudinal control testing.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--desc', help="Extra description to include in report.")
parser.add_argument('--output', help="Write out report to this file.", default=None)
args = parser.parse_args()
assert args.output is None or args.output.endswith(".html"), "Output filename must end with '.html'"
main(args)

View File

@ -1,35 +1,57 @@
from contextlib import contextmanager
import time
from contextlib import AbstractContextManager
from panda import Panda
from opendbc.car.car_helpers import get_car
from opendbc.car.can_definitions import CanData
from opendbc.car.structs import CarControl
@contextmanager
def PandaRunner():
p = Panda()
class PandaRunner(AbstractContextManager):
def __enter__(self):
self.p = Panda()
self.p.reset()
def _can_recv(wait_for_one: bool = False) -> list[list[CanData]]:
recv = p.can_recv()
# setup + fingerprinting
self.p.set_safety_mode(Panda.SAFETY_ELM327, 1)
self.CI = get_car(self._can_recv, self.p.can_send_many, self.p.set_obd, True)
print("fingerprinted", self.CI.CP.carName)
assert self.CI.CP.carFingerprint != "mock", "Unable to identify car. Check connections and ensure car is supported."
self.p.set_safety_mode(Panda.SAFETY_ELM327, 1)
self.CI.init(self.CI.CP, self._can_recv, self.p.can_send_many)
self.p.set_safety_mode(Panda.SAFETY_TOYOTA, self.CI.CP.safetyConfigs[0].safetyParam)
return self
def __exit__(self, exc_type, exc_value, traceback):
self.p.set_safety_mode(Panda.SAFETY_NOOUTPUT)
self.p.reset() # avoid siren
return super().__exit__(exc_type, exc_value, traceback)
@property
def panda(self) -> Panda:
return self.p
def _can_recv(self, wait_for_one: bool = False) -> list[list[CanData]]:
recv = self.p.can_recv()
while len(recv) == 0 and wait_for_one:
recv = p.can_recv()
recv = self.p.can_recv()
return [[CanData(addr, dat, bus) for addr, dat, bus in recv], ]
try:
# setup + fingerprinting
p.set_safety_mode(Panda.SAFETY_ELM327, 1)
CI = get_car(_can_recv, p.can_send_many, p.set_obd, True)
print("fingerprinted", CI.CP.carName)
assert CI.CP.carFingerprint != "mock", "Unable to identify car. Check connections and ensure car is supported."
p.set_safety_mode(Panda.SAFETY_ELM327, 1)
CI.init(CI.CP, _can_recv, p.can_send_many)
p.set_safety_mode(Panda.SAFETY_TOYOTA, CI.CP.safetyConfigs[0].safetyParam)
yield p, CI
finally:
p.set_safety_mode(Panda.SAFETY_NOOUTPUT)
def read(self, strict: bool = True):
cs = self.CI.update([int(time.monotonic()*1e9), self._can_recv()[0]])
if strict:
assert cs.canValid, "CAN went invalid, check connections"
return cs
def write(self, cc: CarControl) -> None:
if cc.enabled and not self.p.health()['controls_allowed']:
# prevent the car from faulting. print a warning?
cc = CarControl(enabled=False)
_, can_sends = self.CI.apply(cc)
self.p.can_send_many(can_sends, timeout=25)
self.p.send_heartbeat()
if __name__ == "__main__":
with PandaRunner() as (p, CI):
print(p.can_recv())
with PandaRunner() as p:
print(p.read())

View File

@ -35,6 +35,7 @@ docs = [
]
examples = [
"inputs",
"matplotlib",
]
[tool.pytest.ini_options]