mirror of https://github.com/commaai/panda.git
WIP: enhance safety replay test (#211)
* safety replay: add msg length * add support for individual segments to safety replay * safety replay: extend segment support to rest of safety modes * safety replay: add debug flag * safety replay: return if no steering msgs in segment mode * minor refactor of safety replay
This commit is contained in:
parent
00ebc16936
commit
19d1e41eed
|
@ -37,7 +37,7 @@ bool get_long_controls_allowed(void);
|
|||
void set_gas_interceptor_detected(bool c);
|
||||
bool get_gas_interceptor_detetcted(void);
|
||||
int get_gas_interceptor_prev(void);
|
||||
void set_timer(int t);
|
||||
void set_timer(uint32_t t);
|
||||
void reset_angle_control(void);
|
||||
|
||||
void safety_rx_hook(CAN_FIFOMailBox_TypeDef *to_send);
|
||||
|
|
|
@ -80,7 +80,7 @@ int get_gas_interceptor_prev(void){
|
|||
return gas_interceptor_prev;
|
||||
}
|
||||
|
||||
void set_timer(int t){
|
||||
void set_timer(uint32_t t){
|
||||
timer.CNT = t;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,104 @@
|
|||
import struct
|
||||
import panda.tests.safety.libpandasafety_py as libpandasafety_py
|
||||
|
||||
safety_modes = {
|
||||
"NOOUTPUT": 0,
|
||||
"HONDA": 1,
|
||||
"TOYOTA": 2,
|
||||
"GM": 3,
|
||||
"HONDA_BOSCH": 4,
|
||||
"FORD": 5,
|
||||
"CADILLAC": 6,
|
||||
"HYUNDAI": 7,
|
||||
"TESLA": 8,
|
||||
"CHRYSLER": 9,
|
||||
"SUBARU": 10,
|
||||
"GM_ASCM": 0x1334,
|
||||
"TOYOTA_IPAS": 0x1335,
|
||||
"ALLOUTPUT": 0x1337,
|
||||
"ELM327": 0xE327
|
||||
}
|
||||
|
||||
def to_signed(d, bits):
|
||||
ret = d
|
||||
if d >= (1 << (bits - 1)):
|
||||
ret = d - (1 << bits)
|
||||
return ret
|
||||
|
||||
def is_steering_msg(mode, addr):
|
||||
ret = False
|
||||
if mode == safety_modes["HONDA"] or mode == safety_modes["HONDA_BOSCH"]:
|
||||
ret = (addr == 0xE4) or (addr == 0x194) or (addr == 0x33D)
|
||||
elif mode == safety_modes["TOYOTA"]:
|
||||
ret = addr == 0x2E4
|
||||
elif mode == safety_modes["GM"]:
|
||||
ret = addr == 384
|
||||
elif mode == safety_modes["HYUNDAI"]:
|
||||
ret = addr == 832
|
||||
elif mode == safety_modes["CHRYSLER"]:
|
||||
ret = addr == 0x292
|
||||
elif mode == safety_modes["SUBARU"]:
|
||||
ret = addr == 0x122
|
||||
return ret
|
||||
|
||||
def get_steer_torque(mode, to_send):
|
||||
ret = 0
|
||||
if mode == safety_modes["HONDA"] or mode == safety_modes["HONDA_BOSCH"]:
|
||||
ret = to_send.RDLR & 0xFFFF0000
|
||||
elif mode == safety_modes["TOYOTA"]:
|
||||
ret = (to_send.RDLR & 0xFF00) | ((to_send.RDLR >> 16) & 0xFF)
|
||||
ret = to_signed(ret, 16)
|
||||
elif mode == safety_modes["GM"]:
|
||||
ret = ((to_send.RDLR & 0x7) << 8) + ((to_send.RDLR & 0xFF00) >> 8)
|
||||
ret = to_signed(ret, 11)
|
||||
elif mode == safety_modes["HYUNDAI"]:
|
||||
ret = ((to_send.RDLR >> 16) & 0x7ff) - 1024
|
||||
elif mode == safety_modes["CHRYSLER"]:
|
||||
ret = ((to_send.RDLR & 0x7) << 8) + ((to_send.RDLR & 0xFF00) >> 8) - 1024
|
||||
elif mode == safety_modes["SUBARU"]:
|
||||
ret = ((to_send.RDLR >> 16) & 0x1FFF)
|
||||
ret = to_signed(ret, 13)
|
||||
return ret
|
||||
|
||||
def set_desired_torque_last(safety, mode, torque):
|
||||
if mode == safety_modes["HONDA"] or mode == safety_modes["HONDA_BOSCH"]:
|
||||
pass # honda safety mode doesn't enforce a rate on steering msgs
|
||||
elif mode == safety_modes["TOYOTA"]:
|
||||
safety.set_toyota_desired_torque_last(torque)
|
||||
elif mode == safety_modes["GM"]:
|
||||
safety.set_gm_desired_torque_last(torque)
|
||||
elif mode == safety_modes["HYUNDAI"]:
|
||||
safety.set_hyundai_desired_torque_last(torque)
|
||||
elif mode == safety_modes["CHRYSLER"]:
|
||||
safety.set_chrysler_desired_torque_last(torque)
|
||||
elif mode == safety_modes["SUBARU"]:
|
||||
safety.set_subaru_desired_torque_last(torque)
|
||||
|
||||
def package_can_msg(msg):
|
||||
addr_shift = 3 if msg.address >= 0x800 else 21
|
||||
rdlr, rdhr = struct.unpack('II', msg.dat.ljust(8, b'\x00'))
|
||||
|
||||
ret = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
ret[0].RIR = msg.address << addr_shift
|
||||
ret[0].RDTR = len(msg.dat) | ((msg.src & 0xF) << 4)
|
||||
ret[0].RDHR = rdhr
|
||||
ret[0].RDLR = rdlr
|
||||
|
||||
return ret
|
||||
|
||||
def init_segment(safety, lr, mode):
|
||||
sendcan = (msg for msg in lr if msg.which() == 'sendcan')
|
||||
steering_msgs = (can for msg in sendcan for can in msg.sendcan if is_steering_msg(mode, can.address))
|
||||
|
||||
msg = next(steering_msgs, None)
|
||||
if msg is None:
|
||||
# no steering msgs
|
||||
return
|
||||
|
||||
to_send = package_can_msg(msg)
|
||||
torque = get_steer_torque(mode, to_send)
|
||||
if torque != 0:
|
||||
safety.set_controls_allowed(1)
|
||||
set_desired_torque_last(safety, mode, torque)
|
||||
assert safety.safety_tx_hook(to_send), "failed to initialize panda safety for segment"
|
||||
|
|
@ -1,28 +1,13 @@
|
|||
#!/usr/bin/env python2
|
||||
|
||||
import os
|
||||
import sys
|
||||
import struct
|
||||
import panda.tests.safety.libpandasafety_py as libpandasafety_py
|
||||
from panda.tests.safety_replay.helpers import is_steering_msg, get_steer_torque, \
|
||||
set_desired_torque_last, package_can_msg, \
|
||||
init_segment, safety_modes
|
||||
from openpilot_tools.lib.logreader import LogReader
|
||||
|
||||
safety_modes = {
|
||||
"NOOUTPUT": 0,
|
||||
"HONDA": 1,
|
||||
"TOYOTA": 2,
|
||||
"GM": 3,
|
||||
"HONDA_BOSCH": 4,
|
||||
"FORD": 5,
|
||||
"CADILLAC": 6,
|
||||
"HYUNDAI": 7,
|
||||
"TESLA": 8,
|
||||
"CHRYSLER": 9,
|
||||
"SUBARU": 10,
|
||||
"GM_ASCM": 0x1334,
|
||||
"TOYOTA_IPAS": 0x1335,
|
||||
"ALLOUTPUT": 0x1337,
|
||||
"ELM327": 0xE327
|
||||
}
|
||||
|
||||
# replay a drive to check for safety violations
|
||||
def replay_drive(lr, safety_mode, param):
|
||||
safety = libpandasafety_py.libpandasafety
|
||||
|
@ -30,28 +15,29 @@ def replay_drive(lr, safety_mode, param):
|
|||
err = safety.safety_set_mode(safety_mode, param)
|
||||
assert err == 0, "invalid safety mode: %d" % safety_mode
|
||||
|
||||
if "SEGMENT" in os.environ:
|
||||
init_segment(safety, lr, mode)
|
||||
|
||||
tx_tot, tx_blocked, tx_controls, tx_controls_blocked = 0, 0, 0, 0
|
||||
blocked_addrs = set()
|
||||
start_t = None
|
||||
|
||||
for msg in lr:
|
||||
safety.set_timer((msg.logMonoTime & 0xFFFFFFFF)/1000)
|
||||
if start_t is None:
|
||||
start_t = msg.logMonoTime
|
||||
safety.set_timer(((msg.logMonoTime / 1000)) % 0xFFFFFFFF)
|
||||
|
||||
if msg.which() == 'sendcan':
|
||||
for canmsg in msg.sendcan:
|
||||
# handle extended addresses
|
||||
addr_shift = 3 if canmsg.address << 21 > 0xFFFFFFFF else 21
|
||||
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = canmsg.address << addr_shift
|
||||
to_send[0].RDTR = (canmsg.src & 0xF) << 4
|
||||
to_send[0].RDHR = struct.unpack('<I', canmsg.dat.ljust(8, '\x00')[4:])[0]
|
||||
to_send[0].RDLR = struct.unpack('<I', canmsg.dat.ljust(8, '\x00')[:4])[0]
|
||||
|
||||
for canmsg in msg.sendcan:
|
||||
to_send = package_can_msg(canmsg)
|
||||
sent = safety.safety_tx_hook(to_send)
|
||||
if not sent:
|
||||
tx_blocked += 1
|
||||
tx_controls_blocked += safety.get_controls_allowed()
|
||||
blocked_addrs.add(canmsg.address)
|
||||
|
||||
if "DEBUG" in os.environ:
|
||||
print "blocked %d at %f" % (canmsg.address, (msg.logMonoTime - start_t)/(1e9))
|
||||
tx_controls += safety.get_controls_allowed()
|
||||
tx_tot += 1
|
||||
elif msg.which() == 'can':
|
||||
|
@ -59,15 +45,7 @@ def replay_drive(lr, safety_mode, param):
|
|||
# ignore msgs we sent
|
||||
if canmsg.src >= 128:
|
||||
continue
|
||||
|
||||
# handle extended addresses
|
||||
addr_shift = 3 if canmsg.address << 21 > 0xFFFFFFFF else 21
|
||||
|
||||
to_push = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_push[0].RIR = canmsg.address << addr_shift
|
||||
to_push[0].RDTR = (canmsg.src & 0xF) << 4
|
||||
to_push[0].RDHR = struct.unpack('<I', canmsg.dat.ljust(8, '\x00')[4:])[0]
|
||||
to_push[0].RDLR = struct.unpack('<I', canmsg.dat.ljust(8, '\x00')[:4])[0]
|
||||
to_push = package_can_msg(canmsg)
|
||||
safety.safety_rx_hook(to_push)
|
||||
|
||||
print "total openpilot msgs:", tx_tot
|
||||
|
@ -89,3 +67,4 @@ if __name__ == "__main__":
|
|||
print "replaying drive %s with safety mode %d and param %d" % (sys.argv[1], mode, param)
|
||||
|
||||
replay_drive(lr, mode, param)
|
||||
|
||||
|
|
|
@ -3,7 +3,8 @@
|
|||
import os
|
||||
import requests
|
||||
|
||||
from replay_drive import replay_drive, safety_modes
|
||||
from helpers import safety_modes
|
||||
from replay_drive import replay_drive
|
||||
from openpilot_tools.lib.logreader import LogReader
|
||||
|
||||
BASE_URL = "https://commadataci.blob.core.windows.net/openpilotci/"
|
||||
|
|
Loading…
Reference in New Issue