mirror of
https://github.com/sunnypilot/sunnypilot.git
synced 2026-04-07 10:24:02 +08:00
* lateral report * mutually exclude buttons * gating * set maneuver * add timer * timer text * fix plot * use curvature * more curves * fix gating * rm delay * highway speed only * msg * add sine * add step-down * use relative * text * stabilize * tuning * windup * text * winddown * no windup * tuning * more tuning * more * formatting * test faster * extend sine * report crossings * add readme * clean report * fix lint * gating * fix * straighter * compensate roll * rm abs roll * len * Revert "rm abs roll" This reverts commit a22d6bb136f90d2bf997e6b9aeee2f784398ef42. * Revert "compensate roll" This reverts commit dfda52119cc4a2e29ac2854b9154c08459086fea. * print actuators * show curve and roll * tune roll * text * slower * timer * too much banked streets in US * readme * filter incomplete * plot jerk * plot angle jerk * lil edits * fix lint * apply suggestions * better table * apply comments * clean * shane comments * deflicker --------- Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
181 lines
6.2 KiB
Python
Executable File
181 lines
6.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import numpy as np
|
|
from dataclasses import dataclass
|
|
|
|
from cereal import messaging, car
|
|
from openpilot.common.constants import CV
|
|
from openpilot.common.realtime import DT_MDL
|
|
from openpilot.common.params import Params
|
|
from openpilot.common.swaglog import cloudlog
|
|
from openpilot.selfdrive.controls.lib.drive_helpers import MIN_SPEED
|
|
from openpilot.tools.longitudinal_maneuvers.maneuversd import Action, Maneuver as _Maneuver
|
|
|
|
# thresholds for starting maneuvers
|
|
MAX_SPEED_DEV = 0.7 # deviation in m/s
|
|
MAX_CURV = 0.002 # 500 m radius
|
|
MAX_ROLL = 0.08 # 4.56°
|
|
TIMER = 2.0 # sec stable conditions before starting maneuver
|
|
|
|
@dataclass
|
|
class Maneuver(_Maneuver):
|
|
_baseline_curvature: float = 0.0
|
|
|
|
def get_accel(self, v_ego: float, lat_active: bool, curvature: float, roll: float) -> float:
|
|
self._run_completed = False
|
|
# only start maneuver on straight, flat roads
|
|
ready = abs(v_ego - self.initial_speed) < MAX_SPEED_DEV and lat_active and abs(curvature) < MAX_CURV and abs(roll) < MAX_ROLL
|
|
self._ready_cnt = (self._ready_cnt + 1) if ready else max(self._ready_cnt - 1, 0)
|
|
|
|
if self._ready_cnt > (TIMER / DT_MDL):
|
|
if not self._active:
|
|
self._baseline_curvature = curvature
|
|
self._active = True
|
|
|
|
if not self._active:
|
|
return 0.0
|
|
|
|
return self._step()
|
|
|
|
def reset(self):
|
|
super().reset()
|
|
self._ready_cnt = 0
|
|
|
|
|
|
def _sine_action(amplitude, period, duration):
|
|
t = np.linspace(0, duration, int(duration / DT_MDL) + 1)
|
|
a = amplitude * np.sin(2 * np.pi * t / period)
|
|
return Action(a.tolist(), t.tolist())
|
|
|
|
|
|
MANEUVERS = [
|
|
Maneuver(
|
|
"step right 20mph",
|
|
[Action([0.5], [1.0]), Action([-0.5], [1.5])],
|
|
repeat=2,
|
|
initial_speed=20. * CV.MPH_TO_MS,
|
|
),
|
|
Maneuver(
|
|
"step left 20mph",
|
|
[Action([-0.5], [1.0]), Action([0.5], [1.5])],
|
|
repeat=2,
|
|
initial_speed=20. * CV.MPH_TO_MS,
|
|
),
|
|
Maneuver(
|
|
"sine 0.5Hz 20mph",
|
|
[_sine_action(1.0, 2.0, 2.0), Action([0.0], [0.5])],
|
|
repeat=2,
|
|
initial_speed=20. * CV.MPH_TO_MS,
|
|
),
|
|
Maneuver(
|
|
"step right 30mph",
|
|
[Action([0.5], [1.0]), Action([-0.5], [1.5])],
|
|
repeat=2,
|
|
initial_speed=30. * CV.MPH_TO_MS,
|
|
),
|
|
Maneuver(
|
|
"step left 30mph",
|
|
[Action([-0.5], [1.0]), Action([0.5], [1.5])],
|
|
repeat=2,
|
|
initial_speed=30. * CV.MPH_TO_MS,
|
|
),
|
|
Maneuver(
|
|
"sine 0.5Hz 30mph",
|
|
[_sine_action(1.0, 2.0, 2.0), Action([0.0], [0.5])],
|
|
repeat=2,
|
|
initial_speed=30. * CV.MPH_TO_MS,
|
|
),
|
|
]
|
|
|
|
|
|
def main():
|
|
params = Params()
|
|
cloudlog.info("lateral_maneuversd is waiting for CarParams")
|
|
messaging.log_from_bytes(params.get("CarParams", block=True), car.CarParams)
|
|
|
|
sm = messaging.SubMaster(['carState', 'carControl', 'controlsState', 'selfdriveState', 'modelV2'], poll='modelV2')
|
|
pm = messaging.PubMaster(['lateralManeuverPlan', 'alertDebug'])
|
|
|
|
maneuvers = iter(MANEUVERS)
|
|
maneuver = None
|
|
complete_cnt = 0
|
|
display_holdoff = 0
|
|
prev_text = ''
|
|
|
|
while True:
|
|
sm.update()
|
|
|
|
if maneuver is None:
|
|
maneuver = next(maneuvers, None)
|
|
|
|
alert_msg = messaging.new_message('alertDebug')
|
|
alert_msg.valid = True
|
|
|
|
plan_send = messaging.new_message('lateralManeuverPlan')
|
|
|
|
accel = 0
|
|
v_ego = max(sm['carState'].vEgo, 0)
|
|
curvature = sm['controlsState'].desiredCurvature
|
|
|
|
if complete_cnt > 0:
|
|
complete_cnt -= 1
|
|
alert_msg.alertDebug.alertText1 = 'Completed'
|
|
alert_msg.alertDebug.alertText2 = maneuver.description
|
|
elif maneuver is not None:
|
|
# reset maneuver on steering override or out of range speed
|
|
if sm['carState'].steeringPressed or (maneuver.active and abs(v_ego - maneuver.initial_speed) > MAX_SPEED_DEV):
|
|
maneuver.reset()
|
|
|
|
roll = sm['carControl'].orientationNED[0] if len(sm['carControl'].orientationNED) == 3 else 0.0
|
|
accel = maneuver.get_accel(v_ego, sm['carControl'].latActive, curvature, roll)
|
|
|
|
if maneuver._run_completed:
|
|
complete_cnt = int(1.0 / DT_MDL)
|
|
alert_msg.alertDebug.alertText1 = 'Complete'
|
|
alert_msg.alertDebug.alertText2 = maneuver.description
|
|
elif maneuver.active:
|
|
action_remaining = maneuver.actions[maneuver._action_index].time_bp[-1] - maneuver._action_frames * DT_MDL
|
|
if maneuver.description.startswith('sine'):
|
|
freq = maneuver.description.split()[1]
|
|
alert_msg.alertDebug.alertText1 = f'Active sine {freq} {max(action_remaining, 0):.1f}s'
|
|
else:
|
|
alert_msg.alertDebug.alertText1 = f'Active {accel:+.1f}m/s² {max(action_remaining, 0):.1f}s'
|
|
alert_msg.alertDebug.alertText2 = maneuver.description
|
|
elif not (abs(v_ego - maneuver.initial_speed) < MAX_SPEED_DEV and sm['carControl'].latActive):
|
|
alert_msg.alertDebug.alertText1 = f'Set speed to {maneuver.initial_speed * CV.MS_TO_MPH:0.0f} mph'
|
|
elif maneuver._ready_cnt > 0:
|
|
ready_time = max(TIMER - maneuver._ready_cnt * DT_MDL, 0)
|
|
alert_msg.alertDebug.alertText1 = f'Starting: {int(ready_time) + 1}'
|
|
alert_msg.alertDebug.alertText2 = maneuver.description
|
|
else:
|
|
curv_ok = abs(curvature) < MAX_CURV
|
|
reason = 'road not straight' if not curv_ok else 'road not flat'
|
|
alert_msg.alertDebug.alertText1 = f'Waiting: {reason}'
|
|
alert_msg.alertDebug.alertText2 = maneuver.description
|
|
else:
|
|
alert_msg.alertDebug.alertText1 = 'Maneuvers Finished'
|
|
|
|
# prevent flickering text
|
|
setup = ('Set speed', 'Starting', 'Waiting')
|
|
text = alert_msg.alertDebug.alertText1
|
|
same = text == prev_text or (text.startswith('Starting') and prev_text.startswith('Starting'))
|
|
if not same and text.startswith(setup) and prev_text.startswith(setup) and display_holdoff > 0:
|
|
alert_msg.alertDebug.alertText1 = prev_text
|
|
display_holdoff -= 1
|
|
else:
|
|
prev_text = text
|
|
display_holdoff = int(0.5 / DT_MDL) if text.startswith(setup) else 0
|
|
|
|
pm.send('alertDebug', alert_msg)
|
|
|
|
plan_send.valid = maneuver is not None and maneuver.active and complete_cnt == 0
|
|
if plan_send.valid:
|
|
plan_send.lateralManeuverPlan.desiredCurvature = maneuver._baseline_curvature + accel / max(v_ego, MIN_SPEED) ** 2
|
|
pm.send('lateralManeuverPlan', plan_send)
|
|
|
|
if maneuver is not None and maneuver.finished and complete_cnt == 0:
|
|
maneuver = None
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|