From e7695c3f32e95bbd0ad31e2ef09075c93ba9f65b Mon Sep 17 00:00:00 2001 From: Adeeb Shihadeh Date: Wed, 21 Feb 2024 11:12:56 -0800 Subject: [PATCH] add power cycling to can_replay --- tools/replay/can_replay.py | 56 ++++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/tools/replay/can_replay.py b/tools/replay/can_replay.py index ef6864f110..856dd80d49 100755 --- a/tools/replay/can_replay.py +++ b/tools/replay/can_replay.py @@ -12,43 +12,47 @@ from openpilot.selfdrive.boardd.boardd import can_capnp_to_can_list from openpilot.tools.lib.logreader import LogReader from panda import Panda, PandaJungle -def send_thread(s, flock): - if "Jungle" in str(type(s)): - if "FLASH" in os.environ: - with flock: - s.flash() +# set both to cycle power or ignition +PWR_ON = int(os.getenv("PWR_ON", "0")) +PWR_OFF = int(os.getenv("PWR_OFF", "0")) +IGN_ON = int(os.getenv("ON", "0")) +IGN_OFF = int(os.getenv("OFF", "0")) +ENABLE_IGN = IGN_ON > 0 and IGN_OFF > 0 +ENABLE_PWR = PWR_ON > 0 and PWR_OFF > 0 - for i in [0, 1, 2, 3, 0xFFFF]: - s.can_clear(i) - s.set_can_speed_kbps(i, 500) - s.set_can_data_speed_kbps(i, 500) - s.set_ignition(False) - time.sleep(5) - s.set_ignition(True) - s.set_panda_power(True) - else: - s.set_safety_mode(Panda.SAFETY_ALLOUTPUT) + +def send_thread(s, flock): + if "FLASH" in os.environ: + with flock: + s.flash() + + for i in [0, 1, 2, 3, 0xFFFF]: + s.can_clear(i) + s.set_can_speed_kbps(i, 500) + s.set_can_data_speed_kbps(i, 500) + s.set_ignition(False) + time.sleep(5) + s.set_ignition(True) + s.set_panda_power(True) s.set_can_loopback(False) - idx = 0 - ign = True rk = Ratekeeper(1 / DT_CTRL, print_delay_threshold=None) while True: - # handle ignition cycling + # handle cycling + if ENABLE_PWR: + i = (rk.frame*DT_CTRL) % (PWR_ON + PWR_OFF) < PWR_ON + s.set_panda_power(i) if ENABLE_IGN: i = (rk.frame*DT_CTRL) % (IGN_ON + IGN_OFF) < IGN_ON - if i != ign: - ign = i - s.set_ignition(ign) + s.set_ignition(i) - snd = CAN_MSGS[idx] + snd = CAN_MSGS[rk.frame % len(CAN_MSGS)] snd = list(filter(lambda x: x[-1] <= 2, snd)) try: s.can_send_many(snd) except usb1.USBErrorTimeout: # timeout is fine, just means the CAN TX buffer is full pass - idx = (idx + 1) % len(CAN_MSGS) # Drain panda message buffer s.can_recv() @@ -98,10 +102,8 @@ if __name__ == "__main__": print("Finished loading...") - # set both to cycle ignition - IGN_ON = int(os.getenv("ON", "0")) - IGN_OFF = int(os.getenv("OFF", "0")) - ENABLE_IGN = IGN_ON > 0 and IGN_OFF > 0 + if ENABLE_PWR: + print(f"Cycling power: on for {PWR_ON}s, off for {PWR_OFF}s") if ENABLE_IGN: print(f"Cycling ignition: on for {IGN_ON}s, off for {IGN_OFF}s")