can_replay: add type hint (#31601)

type hint
old-commit-hash: edd26acdc3
This commit is contained in:
Justin Newberry 2024-02-26 19:26:01 -05:00 committed by GitHub
parent c799790b9b
commit d036050624
1 changed files with 13 additions and 13 deletions

View File

@ -21,41 +21,41 @@ ENABLE_IGN = IGN_ON > 0 and IGN_OFF > 0
ENABLE_PWR = PWR_ON > 0 and PWR_OFF > 0
def send_thread(s, flock):
def send_thread(j: PandaJungle, flock):
if "FLASH" in os.environ:
with flock:
s.flash()
j.flash()
for i in [0, 1, 2, 3, 0xFFFF]:
s.can_clear(i)
s.set_can_speed_kbps(i, 500)
s.set_can_data_speed_kbps(i, 500)
s.set_ignition(False)
j.can_clear(i)
j.set_can_speed_kbps(i, 500)
j.set_can_data_speed_kbps(i, 500)
j.set_ignition(False)
time.sleep(5)
s.set_ignition(True)
s.set_panda_power(True)
s.set_can_loopback(False)
j.set_ignition(True)
j.set_panda_power(True)
j.set_can_loopback(False)
rk = Ratekeeper(1 / DT_CTRL, print_delay_threshold=None)
while True:
# handle cycling
if ENABLE_PWR:
i = (rk.frame*DT_CTRL) % (PWR_ON + PWR_OFF) < PWR_ON
s.set_panda_power(i)
j.set_panda_power(i)
if ENABLE_IGN:
i = (rk.frame*DT_CTRL) % (IGN_ON + IGN_OFF) < IGN_ON
s.set_ignition(i)
j.set_ignition(i)
snd = CAN_MSGS[rk.frame % len(CAN_MSGS)]
snd = list(filter(lambda x: x[-1] <= 2, snd))
try:
s.can_send_many(snd)
j.can_send_many(snd)
except usb1.USBErrorTimeout:
# timeout is fine, just means the CAN TX buffer is full
pass
# Drain panda message buffer
s.can_recv()
j.can_recv()
rk.keep_time()