mirror of
https://github.com/dragonpilot/dragonpilot.git
synced 2026-02-26 08:53:53 +08:00
Merge panda subtree
This commit is contained in:
@@ -1,11 +1,34 @@
|
||||
from .helpers import test_all_pandas, panda_connect_and_init
|
||||
import os
|
||||
|
||||
from nose.tools import assert_equal
|
||||
|
||||
from panda import Panda, BASEDIR
|
||||
from .helpers import reset_pandas, test_all_pandas, panda_connect_and_init
|
||||
|
||||
|
||||
# Reset the pandas before flashing them
|
||||
def aaaa_reset_before_tests():
|
||||
reset_pandas()
|
||||
|
||||
|
||||
@test_all_pandas
|
||||
@panda_connect_and_init
|
||||
def test_recover(p):
|
||||
assert p.recover(timeout=30)
|
||||
|
||||
|
||||
@test_all_pandas
|
||||
@panda_connect_and_init
|
||||
def test_flash(p):
|
||||
p.flash()
|
||||
|
||||
|
||||
@test_all_pandas
|
||||
@panda_connect_and_init
|
||||
def test_get_signature(p):
|
||||
fn = os.path.join(BASEDIR, "board/obj/panda.bin")
|
||||
|
||||
firmware_sig = Panda.get_signature_from_firmware(fn)
|
||||
panda_sig = p.get_signature()
|
||||
|
||||
assert_equal(panda_sig, firmware_sig)
|
||||
|
||||
44
panda/tests/automated/2_health.py
Normal file
44
panda/tests/automated/2_health.py
Normal file
@@ -0,0 +1,44 @@
|
||||
import time
|
||||
from panda_jungle import PandaJungle # pylint: disable=import-error
|
||||
from .helpers import panda_jungle, reset_pandas, test_all_pandas, test_all_gen2_pandas, panda_connect_and_init
|
||||
|
||||
# Reset the pandas before running tests
|
||||
def aaaa_reset_before_tests():
|
||||
reset_pandas()
|
||||
|
||||
@test_all_pandas
|
||||
@panda_connect_and_init
|
||||
def test_ignition(p):
|
||||
try:
|
||||
# Set harness orientation to #2, since the ignition line is on the wrong SBU bus :/
|
||||
panda_jungle.set_harness_orientation(PandaJungle.HARNESS_ORIENTATION_2)
|
||||
reset_pandas()
|
||||
p.reconnect()
|
||||
panda_jungle.set_ignition(False)
|
||||
time.sleep(2)
|
||||
assert p.health()['ignition_line'] == False
|
||||
panda_jungle.set_ignition(True)
|
||||
time.sleep(2)
|
||||
assert p.health()['ignition_line'] == True
|
||||
finally:
|
||||
panda_jungle.set_harness_orientation(PandaJungle.HARNESS_ORIENTATION_1)
|
||||
|
||||
@test_all_gen2_pandas
|
||||
@panda_connect_and_init
|
||||
def test_orientation_detection(p):
|
||||
seen_orientations = []
|
||||
for i in range(3):
|
||||
panda_jungle.set_harness_orientation(i)
|
||||
reset_pandas()
|
||||
p.reconnect()
|
||||
detected_harness_orientation = p.health()['car_harness_status']
|
||||
if (i == 0 and detected_harness_orientation != 0) or detected_harness_orientation in seen_orientations:
|
||||
assert False
|
||||
seen_orientations.append(detected_harness_orientation)
|
||||
|
||||
|
||||
@test_all_pandas
|
||||
@panda_connect_and_init
|
||||
def test_voltage(p):
|
||||
voltage = p.health()['voltage']
|
||||
assert ((voltage > 10000) and (voltage < 14000))
|
||||
@@ -2,11 +2,18 @@ import sys
|
||||
import time
|
||||
from panda import Panda
|
||||
from nose.tools import assert_equal, assert_less, assert_greater
|
||||
from .helpers import SPEED_NORMAL, SPEED_GMLAN, time_many_sends, test_white_and_grey, panda_type_to_serial, test_all_pandas, panda_connect_and_init
|
||||
from .helpers import start_heartbeat_thread, reset_pandas, SPEED_NORMAL, SPEED_GMLAN, time_many_sends, test_white_and_grey, panda_type_to_serial, test_all_pandas, panda_connect_and_init
|
||||
|
||||
# Reset the pandas before running tests
|
||||
def aaaa_reset_before_tests():
|
||||
reset_pandas()
|
||||
|
||||
@test_all_pandas
|
||||
@panda_connect_and_init
|
||||
def test_can_loopback(p):
|
||||
# Start heartbeat
|
||||
start_heartbeat_thread(p)
|
||||
|
||||
# enable output mode
|
||||
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
||||
|
||||
@@ -40,8 +47,11 @@ def test_can_loopback(p):
|
||||
@test_all_pandas
|
||||
@panda_connect_and_init
|
||||
def test_safety_nooutput(p):
|
||||
# Start heartbeat
|
||||
start_heartbeat_thread(p)
|
||||
|
||||
# enable output mode
|
||||
p.set_safety_mode(Panda.SAFETY_NOOUTPUT)
|
||||
p.set_safety_mode(Panda.SAFETY_SILENT)
|
||||
|
||||
# enable CAN loopback mode
|
||||
p.set_can_loopback(True)
|
||||
@@ -60,6 +70,9 @@ def test_reliability(p):
|
||||
LOOP_COUNT = 100
|
||||
MSG_COUNT = 100
|
||||
|
||||
# Start heartbeat
|
||||
start_heartbeat_thread(p)
|
||||
|
||||
# enable output mode
|
||||
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
||||
p.set_can_loopback(True)
|
||||
@@ -95,6 +108,9 @@ def test_reliability(p):
|
||||
@test_all_pandas
|
||||
@panda_connect_and_init
|
||||
def test_throughput(p):
|
||||
# Start heartbeat
|
||||
start_heartbeat_thread(p)
|
||||
|
||||
# enable output mode
|
||||
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
||||
|
||||
@@ -122,6 +138,9 @@ def test_gmlan(p):
|
||||
if p.legacy:
|
||||
return
|
||||
|
||||
# Start heartbeat
|
||||
start_heartbeat_thread(p)
|
||||
|
||||
# enable output mode
|
||||
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
||||
|
||||
@@ -153,6 +172,9 @@ def test_gmlan_bad_toggle(p):
|
||||
if p.legacy:
|
||||
return
|
||||
|
||||
# Start heartbeat
|
||||
start_heartbeat_thread(p)
|
||||
|
||||
# enable output mode
|
||||
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
||||
|
||||
@@ -1,8 +1,12 @@
|
||||
import time
|
||||
from panda import Panda
|
||||
from .helpers import connect_wifi, test_white, test_all_pandas, panda_type_to_serial, panda_connect_and_init
|
||||
from .helpers import reset_pandas, connect_wifi, test_white, test_all_pandas, panda_type_to_serial, panda_connect_and_init
|
||||
import requests
|
||||
|
||||
# Reset the pandas before running tests
|
||||
def aaaa_reset_before_tests():
|
||||
reset_pandas()
|
||||
|
||||
@test_all_pandas
|
||||
@panda_connect_and_init
|
||||
def test_get_serial(p):
|
||||
@@ -1,6 +1,10 @@
|
||||
import time
|
||||
from panda import Panda
|
||||
from .helpers import time_many_sends, connect_wifi, test_white, panda_type_to_serial
|
||||
from .helpers import start_heartbeat_thread, reset_pandas, time_many_sends, connect_wifi, test_white, panda_type_to_serial
|
||||
|
||||
# Reset the pandas before running tests
|
||||
def aaaa_reset_before_tests():
|
||||
reset_pandas()
|
||||
|
||||
@test_white
|
||||
@panda_type_to_serial
|
||||
@@ -16,6 +20,9 @@ def test_throughput(serials=None):
|
||||
connect_wifi(serials[0])
|
||||
p = Panda(serials[0])
|
||||
|
||||
# Start heartbeat
|
||||
start_heartbeat_thread(p)
|
||||
|
||||
# enable output mode
|
||||
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
||||
|
||||
@@ -43,6 +50,10 @@ def test_throughput(serials=None):
|
||||
def test_recv_only(serials=None):
|
||||
connect_wifi(serials[0])
|
||||
p = Panda(serials[0])
|
||||
|
||||
# Start heartbeat
|
||||
start_heartbeat_thread(p)
|
||||
|
||||
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
||||
|
||||
p.set_can_loopback(True)
|
||||
@@ -1,195 +0,0 @@
|
||||
|
||||
import os
|
||||
import time
|
||||
import random
|
||||
from panda import Panda
|
||||
from nose.tools import assert_equal, assert_less, assert_greater
|
||||
from .helpers import time_many_sends, test_two_panda, test_two_black_panda, panda_type_to_serial, clear_can_buffers, panda_connect_and_init
|
||||
|
||||
@test_two_panda
|
||||
@panda_type_to_serial
|
||||
@panda_connect_and_init
|
||||
def test_send_recv(p_send, p_recv):
|
||||
p_send.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
||||
p_recv.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
||||
p_send.set_can_loopback(False)
|
||||
p_recv.set_can_loopback(False)
|
||||
|
||||
assert not p_send.legacy
|
||||
assert not p_recv.legacy
|
||||
|
||||
p_send.can_send_many([(0x1ba, 0, b"message", 0)]*2)
|
||||
time.sleep(0.05)
|
||||
p_recv.can_recv()
|
||||
p_send.can_recv()
|
||||
|
||||
busses = [0,1,2]
|
||||
|
||||
for bus in busses:
|
||||
for speed in [100, 250, 500, 750, 1000]:
|
||||
p_send.set_can_speed_kbps(bus, speed)
|
||||
p_recv.set_can_speed_kbps(bus, speed)
|
||||
time.sleep(0.05)
|
||||
|
||||
comp_kbps = time_many_sends(p_send, bus, p_recv, two_pandas=True)
|
||||
|
||||
saturation_pct = (comp_kbps/speed) * 100.0
|
||||
assert_greater(saturation_pct, 80)
|
||||
assert_less(saturation_pct, 100)
|
||||
|
||||
print("two pandas bus {}, 100 messages at speed {:4d}, comp speed is {:7.2f}, percent {:6.2f}".format(bus, speed, comp_kbps, saturation_pct))
|
||||
|
||||
@test_two_panda
|
||||
@panda_type_to_serial
|
||||
@panda_connect_and_init
|
||||
def test_latency(p_send, p_recv):
|
||||
p_send.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
||||
p_recv.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
||||
p_send.set_can_loopback(False)
|
||||
p_recv.set_can_loopback(False)
|
||||
|
||||
assert not p_send.legacy
|
||||
assert not p_recv.legacy
|
||||
|
||||
p_send.set_can_speed_kbps(0, 100)
|
||||
p_recv.set_can_speed_kbps(0, 100)
|
||||
time.sleep(0.05)
|
||||
|
||||
p_send.can_send_many([(0x1ba, 0, b"testmsg", 0)]*10)
|
||||
time.sleep(0.05)
|
||||
p_recv.can_recv()
|
||||
p_send.can_recv()
|
||||
|
||||
busses = [0,1,2]
|
||||
|
||||
for bus in busses:
|
||||
for speed in [100, 250, 500, 750, 1000]:
|
||||
p_send.set_can_speed_kbps(bus, speed)
|
||||
p_recv.set_can_speed_kbps(bus, speed)
|
||||
time.sleep(0.1)
|
||||
|
||||
#clear can buffers
|
||||
clear_can_buffers(p_send)
|
||||
clear_can_buffers(p_recv)
|
||||
|
||||
latencies = []
|
||||
comp_kbps_list = []
|
||||
saturation_pcts = []
|
||||
|
||||
num_messages = 100
|
||||
|
||||
for i in range(num_messages):
|
||||
st = time.time()
|
||||
p_send.can_send(0x1ab, b"message", bus)
|
||||
r = []
|
||||
while len(r) < 1 and (time.time() - st) < 5:
|
||||
r = p_recv.can_recv()
|
||||
et = time.time()
|
||||
r_echo = []
|
||||
while len(r_echo) < 1 and (time.time() - st) < 10:
|
||||
r_echo = p_send.can_recv()
|
||||
|
||||
if len(r) == 0 or len(r_echo) == 0:
|
||||
print("r: {}, r_echo: {}".format(r, r_echo))
|
||||
|
||||
assert_equal(len(r),1)
|
||||
assert_equal(len(r_echo),1)
|
||||
|
||||
et = (et - st)*1000.0
|
||||
comp_kbps = (1+11+1+1+1+4+8*8+15+1+1+1+7) / et
|
||||
latency = et - ((1+11+1+1+1+4+8*8+15+1+1+1+7) / speed)
|
||||
|
||||
assert_less(latency, 5.0)
|
||||
|
||||
saturation_pct = (comp_kbps/speed) * 100.0
|
||||
latencies.append(latency)
|
||||
comp_kbps_list.append(comp_kbps)
|
||||
saturation_pcts.append(saturation_pct)
|
||||
|
||||
average_latency = sum(latencies)/num_messages
|
||||
assert_less(average_latency, 1.0)
|
||||
average_comp_kbps = sum(comp_kbps_list)/num_messages
|
||||
average_saturation_pct = sum(saturation_pcts)/num_messages
|
||||
|
||||
print("two pandas bus {}, {} message average at speed {:4d}, latency is {:5.3f}ms, comp speed is {:7.2f}, percent {:6.2f}"\
|
||||
.format(bus, num_messages, speed, average_latency, average_comp_kbps, average_saturation_pct))
|
||||
|
||||
@test_two_black_panda
|
||||
@panda_type_to_serial
|
||||
@panda_connect_and_init
|
||||
def test_black_loopback(panda0, panda1):
|
||||
# disable safety modes
|
||||
panda0.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
||||
panda1.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
||||
|
||||
# disable loopback
|
||||
panda0.set_can_loopback(False)
|
||||
panda1.set_can_loopback(False)
|
||||
|
||||
# clear stuff
|
||||
panda0.can_send_many([(0x1ba, 0, b"testmsg", 0)]*10)
|
||||
time.sleep(0.05)
|
||||
panda0.can_recv()
|
||||
panda1.can_recv()
|
||||
|
||||
# test array (send bus, sender obd, reciever obd, expected busses)
|
||||
test_array = [
|
||||
(0, False, False, [0]),
|
||||
(1, False, False, [1]),
|
||||
(2, False, False, [2]),
|
||||
(0, False, True, [0, 1]),
|
||||
(1, False, True, []),
|
||||
(2, False, True, [2]),
|
||||
(0, True, False, [0]),
|
||||
(1, True, False, [0]),
|
||||
(2, True, False, [2]),
|
||||
(0, True, True, [0, 1]),
|
||||
(1, True, True, [0, 1]),
|
||||
(2, True, True, [2])
|
||||
]
|
||||
|
||||
# test functions
|
||||
def get_test_string():
|
||||
return b"test"+os.urandom(10)
|
||||
|
||||
def _test_buses(send_panda, recv_panda, _test_array):
|
||||
for send_bus, send_obd, recv_obd, recv_buses in _test_array:
|
||||
print("\nSend bus:", send_bus, " Send OBD:", send_obd, " Recv OBD:", recv_obd)
|
||||
|
||||
# set OBD on pandas
|
||||
send_panda.set_gmlan(True if send_obd else None)
|
||||
recv_panda.set_gmlan(True if recv_obd else None)
|
||||
|
||||
# clear buffers
|
||||
clear_can_buffers(send_panda)
|
||||
clear_can_buffers(recv_panda)
|
||||
|
||||
# send the characters
|
||||
at = random.randint(1, 2000)
|
||||
st = get_test_string()[0:8]
|
||||
send_panda.can_send(at, st, send_bus)
|
||||
time.sleep(0.1)
|
||||
|
||||
# check for receive
|
||||
_ = send_panda.can_recv() # cans echo
|
||||
cans_loop = recv_panda.can_recv()
|
||||
|
||||
loop_buses = []
|
||||
for loop in cans_loop:
|
||||
print(" Loop on bus", str(loop[3]))
|
||||
loop_buses.append(loop[3])
|
||||
if len(cans_loop) == 0:
|
||||
print(" No loop")
|
||||
|
||||
# test loop buses
|
||||
recv_buses.sort()
|
||||
loop_buses.sort()
|
||||
assert recv_buses == loop_buses
|
||||
print(" TEST PASSED")
|
||||
print("\n")
|
||||
|
||||
# test both orientations
|
||||
print("***************** TESTING (0 --> 1) *****************")
|
||||
_test_buses(panda0, panda1, test_array)
|
||||
print("***************** TESTING (1 --> 0) *****************")
|
||||
_test_buses(panda1, panda0, test_array)
|
||||
@@ -1,16 +1,23 @@
|
||||
|
||||
import sys
|
||||
import time
|
||||
from .helpers import time_many_sends, connect_wifi, test_white, panda_type_to_serial
|
||||
from .helpers import start_heartbeat_thread, reset_pandas, time_many_sends, connect_wifi, test_white, panda_type_to_serial
|
||||
from panda import Panda, PandaWifiStreaming
|
||||
from nose.tools import assert_less, assert_greater
|
||||
|
||||
# Reset the pandas before running tests
|
||||
def aaaa_reset_before_tests():
|
||||
reset_pandas()
|
||||
|
||||
@test_white
|
||||
@panda_type_to_serial
|
||||
def test_udp_doesnt_drop(serials=None):
|
||||
connect_wifi(serials[0])
|
||||
|
||||
p = Panda(serials[0])
|
||||
|
||||
# Start heartbeat
|
||||
start_heartbeat_thread(p)
|
||||
|
||||
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
||||
p.set_can_loopback(True)
|
||||
|
||||
202
panda/tests/automated/7_can_loopback.py
Normal file
202
panda/tests/automated/7_can_loopback.py
Normal file
@@ -0,0 +1,202 @@
|
||||
import os
|
||||
import time
|
||||
import random
|
||||
from panda import Panda
|
||||
from nose.tools import assert_equal, assert_less, assert_greater
|
||||
from .helpers import panda_jungle, start_heartbeat_thread, reset_pandas, time_many_sends, test_all_pandas, test_all_gen2_pandas, clear_can_buffers, panda_connect_and_init
|
||||
|
||||
# Reset the pandas before running tests
|
||||
def aaaa_reset_before_tests():
|
||||
reset_pandas()
|
||||
|
||||
@test_all_pandas
|
||||
@panda_connect_and_init
|
||||
def test_send_recv(p):
|
||||
def test(p_send, p_recv):
|
||||
p_send.set_can_loopback(False)
|
||||
p_recv.set_can_loopback(False)
|
||||
|
||||
p_send.can_send_many([(0x1ba, 0, b"message", 0)]*2)
|
||||
time.sleep(0.05)
|
||||
p_recv.can_recv()
|
||||
p_send.can_recv()
|
||||
|
||||
busses = [0,1,2]
|
||||
|
||||
for bus in busses:
|
||||
for speed in [100, 250, 500, 750, 1000]:
|
||||
p_send.set_can_speed_kbps(bus, speed)
|
||||
p_recv.set_can_speed_kbps(bus, speed)
|
||||
time.sleep(0.05)
|
||||
|
||||
clear_can_buffers(p_send)
|
||||
clear_can_buffers(p_recv)
|
||||
|
||||
comp_kbps = time_many_sends(p_send, bus, p_recv, two_pandas=True)
|
||||
|
||||
saturation_pct = (comp_kbps/speed) * 100.0
|
||||
assert_greater(saturation_pct, 80)
|
||||
assert_less(saturation_pct, 100)
|
||||
|
||||
print("two pandas bus {}, 100 messages at speed {:4d}, comp speed is {:7.2f}, percent {:6.2f}".format(bus, speed, comp_kbps, saturation_pct))
|
||||
|
||||
# Start heartbeat
|
||||
start_heartbeat_thread(p)
|
||||
|
||||
# Set safety mode and power saving
|
||||
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
||||
p.set_power_save(False)
|
||||
|
||||
try:
|
||||
# Run tests in both directions
|
||||
test(p, panda_jungle)
|
||||
test(panda_jungle, p)
|
||||
except Exception as e:
|
||||
# Raise errors again, we don't want them to get lost
|
||||
raise e
|
||||
finally:
|
||||
# Set back to silent mode
|
||||
p.set_safety_mode(Panda.SAFETY_SILENT)
|
||||
|
||||
@test_all_pandas
|
||||
@panda_connect_and_init
|
||||
def test_latency(p):
|
||||
def test(p_send, p_recv):
|
||||
p_send.set_can_loopback(False)
|
||||
p_recv.set_can_loopback(False)
|
||||
|
||||
p_send.set_can_speed_kbps(0, 100)
|
||||
p_recv.set_can_speed_kbps(0, 100)
|
||||
time.sleep(0.05)
|
||||
|
||||
p_send.can_send_many([(0x1ba, 0, b"testmsg", 0)]*10)
|
||||
time.sleep(0.05)
|
||||
p_recv.can_recv()
|
||||
p_send.can_recv()
|
||||
|
||||
busses = [0,1,2]
|
||||
|
||||
for bus in busses:
|
||||
for speed in [100, 250, 500, 750, 1000]:
|
||||
p_send.set_can_speed_kbps(bus, speed)
|
||||
p_recv.set_can_speed_kbps(bus, speed)
|
||||
time.sleep(0.1)
|
||||
|
||||
# clear can buffers
|
||||
clear_can_buffers(p_send)
|
||||
clear_can_buffers(p_recv)
|
||||
|
||||
latencies = []
|
||||
comp_kbps_list = []
|
||||
saturation_pcts = []
|
||||
|
||||
num_messages = 100
|
||||
|
||||
for i in range(num_messages):
|
||||
st = time.time()
|
||||
p_send.can_send(0x1ab, b"message", bus)
|
||||
r = []
|
||||
while len(r) < 1 and (time.time() - st) < 5:
|
||||
r = p_recv.can_recv()
|
||||
et = time.time()
|
||||
r_echo = []
|
||||
while len(r_echo) < 1 and (time.time() - st) < 10:
|
||||
r_echo = p_send.can_recv()
|
||||
|
||||
if len(r) == 0 or len(r_echo) == 0:
|
||||
print("r: {}, r_echo: {}".format(r, r_echo))
|
||||
|
||||
assert_equal(len(r),1)
|
||||
assert_equal(len(r_echo),1)
|
||||
|
||||
et = (et - st)*1000.0
|
||||
comp_kbps = (1+11+1+1+1+4+8*8+15+1+1+1+7) / et
|
||||
latency = et - ((1+11+1+1+1+4+8*8+15+1+1+1+7) / speed)
|
||||
|
||||
assert_less(latency, 5.0)
|
||||
|
||||
saturation_pct = (comp_kbps/speed) * 100.0
|
||||
latencies.append(latency)
|
||||
comp_kbps_list.append(comp_kbps)
|
||||
saturation_pcts.append(saturation_pct)
|
||||
|
||||
average_latency = sum(latencies)/num_messages
|
||||
assert_less(average_latency, 1.0)
|
||||
average_comp_kbps = sum(comp_kbps_list)/num_messages
|
||||
average_saturation_pct = sum(saturation_pcts)/num_messages
|
||||
|
||||
print("two pandas bus {}, {} message average at speed {:4d}, latency is {:5.3f}ms, comp speed is {:7.2f}, percent {:6.2f}"\
|
||||
.format(bus, num_messages, speed, average_latency, average_comp_kbps, average_saturation_pct))
|
||||
|
||||
# Start heartbeat
|
||||
start_heartbeat_thread(p)
|
||||
|
||||
# Set safety mode and power saving
|
||||
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
||||
p.set_power_save(False)
|
||||
|
||||
try:
|
||||
# Run tests in both directions
|
||||
test(p, panda_jungle)
|
||||
test(panda_jungle, p)
|
||||
except Exception as e:
|
||||
# Raise errors again, we don't want them to get lost
|
||||
raise e
|
||||
finally:
|
||||
# Set back to silent mode
|
||||
p.set_safety_mode(Panda.SAFETY_SILENT)
|
||||
|
||||
|
||||
@test_all_gen2_pandas
|
||||
@panda_connect_and_init
|
||||
def test_gen2_loopback(p):
|
||||
def test(p_send, p_recv):
|
||||
for bus in range(4):
|
||||
obd = False
|
||||
if bus == 3:
|
||||
obd = True
|
||||
bus = 1
|
||||
|
||||
# Clear buses
|
||||
clear_can_buffers(p_send)
|
||||
clear_can_buffers(p_recv)
|
||||
|
||||
# Send a random string
|
||||
addr = random.randint(1, 2000)
|
||||
string = b"test"+os.urandom(4)
|
||||
p_send.set_obd(obd)
|
||||
p_recv.set_obd(obd)
|
||||
time.sleep(0.2)
|
||||
p_send.can_send(addr, string, bus)
|
||||
time.sleep(0.2)
|
||||
|
||||
content = p_recv.can_recv()
|
||||
|
||||
# Check amount of messages
|
||||
assert len(content) == 1
|
||||
|
||||
# Check content
|
||||
assert content[0][0] == addr and content[0][2] == string
|
||||
|
||||
# Check bus
|
||||
assert content[0][3] == bus
|
||||
|
||||
print("Bus:", bus, "OBD:", obd, "OK")
|
||||
|
||||
# Start heartbeat
|
||||
start_heartbeat_thread(p)
|
||||
|
||||
# Set safety mode and power saving
|
||||
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
||||
p.set_power_save(False)
|
||||
|
||||
try:
|
||||
# Run tests in both directions
|
||||
test(p, panda_jungle)
|
||||
test(panda_jungle, p)
|
||||
except Exception as e:
|
||||
# Raise errors again, we don't want them to get lost
|
||||
raise e
|
||||
finally:
|
||||
# Set back to silent mode
|
||||
p.set_safety_mode(Panda.SAFETY_SILENT)
|
||||
@@ -5,21 +5,52 @@ import random
|
||||
import subprocess
|
||||
import requests
|
||||
import _thread
|
||||
import faulthandler
|
||||
from functools import wraps
|
||||
from panda import Panda
|
||||
from panda_jungle import PandaJungle # pylint: disable=import-error
|
||||
from nose.tools import assert_equal
|
||||
from parameterized import parameterized, param
|
||||
from .timeout import run_with_timeout
|
||||
from .wifi_helpers import _connect_wifi
|
||||
|
||||
SPEED_NORMAL = 500
|
||||
SPEED_GMLAN = 33.3
|
||||
BUS_SPEEDS = [(0, SPEED_NORMAL), (1, SPEED_NORMAL), (2, SPEED_NORMAL), (3, SPEED_GMLAN)]
|
||||
TIMEOUT = 30
|
||||
GEN2_HW_TYPES = [Panda.HW_TYPE_BLACK_PANDA, Panda.HW_TYPE_UNO]
|
||||
|
||||
# Enable fault debug
|
||||
faulthandler.enable(all_threads=False)
|
||||
|
||||
# Connect to Panda Jungle
|
||||
panda_jungle = PandaJungle()
|
||||
|
||||
# Find all panda's connected
|
||||
_panda_serials = None
|
||||
def init_panda_serials():
|
||||
global panda_jungle, _panda_serials
|
||||
_panda_serials = []
|
||||
panda_jungle.set_panda_power(True)
|
||||
time.sleep(5)
|
||||
for serial in Panda.list():
|
||||
p = Panda(serial=serial)
|
||||
_panda_serials.append((serial, p.get_type()))
|
||||
p.close()
|
||||
print('Found', str(len(_panda_serials)), 'pandas')
|
||||
init_panda_serials()
|
||||
|
||||
# Panda providers
|
||||
test_all_types = parameterized([
|
||||
param(panda_type=Panda.HW_TYPE_WHITE_PANDA),
|
||||
param(panda_type=Panda.HW_TYPE_GREY_PANDA),
|
||||
param(panda_type=Panda.HW_TYPE_BLACK_PANDA)
|
||||
])
|
||||
test_all_pandas = parameterized(
|
||||
Panda.list()
|
||||
list(map(lambda x: x[0], _panda_serials))
|
||||
)
|
||||
test_all_gen2_pandas = parameterized(
|
||||
list(map(lambda x: x[0], filter(lambda x: x[1] in GEN2_HW_TYPES, _panda_serials)))
|
||||
)
|
||||
test_white_and_grey = parameterized([
|
||||
param(panda_type=Panda.HW_TYPE_WHITE_PANDA),
|
||||
@@ -31,13 +62,8 @@ test_white = parameterized([
|
||||
test_grey = parameterized([
|
||||
param(panda_type=Panda.HW_TYPE_GREY_PANDA)
|
||||
])
|
||||
test_two_panda = parameterized([
|
||||
param(panda_type=[Panda.HW_TYPE_GREY_PANDA, Panda.HW_TYPE_WHITE_PANDA]),
|
||||
param(panda_type=[Panda.HW_TYPE_WHITE_PANDA, Panda.HW_TYPE_GREY_PANDA]),
|
||||
param(panda_type=[Panda.HW_TYPE_BLACK_PANDA, Panda.HW_TYPE_BLACK_PANDA])
|
||||
])
|
||||
test_two_black_panda = parameterized([
|
||||
param(panda_type=[Panda.HW_TYPE_BLACK_PANDA, Panda.HW_TYPE_BLACK_PANDA])
|
||||
test_black = parameterized([
|
||||
param(panda_type=Panda.HW_TYPE_BLACK_PANDA)
|
||||
])
|
||||
|
||||
def connect_wifi(serial=None):
|
||||
@@ -47,111 +73,26 @@ def connect_wifi(serial=None):
|
||||
assert(dongle_id.isalnum())
|
||||
_connect_wifi(dongle_id, pw)
|
||||
|
||||
FNULL = open(os.devnull, 'w')
|
||||
def _connect_wifi(dongle_id, pw, insecure_okay=False):
|
||||
ssid = "panda-" + dongle_id
|
||||
|
||||
r = subprocess.call(["ping", "-W", "4", "-c", "1", "192.168.0.10"], stdout=FNULL, stderr=subprocess.STDOUT)
|
||||
if not r:
|
||||
#Can already ping, try connecting on wifi
|
||||
try:
|
||||
p = Panda("WIFI")
|
||||
p.get_serial()
|
||||
print("Already connected")
|
||||
return
|
||||
except:
|
||||
pass
|
||||
|
||||
print("WIFI: connecting to %s" % ssid)
|
||||
|
||||
while 1:
|
||||
if sys.platform == "darwin":
|
||||
os.system("networksetup -setairportnetwork en0 %s %s" % (ssid, pw))
|
||||
else:
|
||||
wlan_interface = subprocess.check_output(["sh", "-c", "iw dev | awk '/Interface/ {print $2}'"]).strip().decode('utf8')
|
||||
cnt = 0
|
||||
MAX_TRIES = 10
|
||||
while cnt < MAX_TRIES:
|
||||
print("WIFI: scanning %d" % cnt)
|
||||
os.system("iwlist %s scanning > /dev/null" % wlan_interface)
|
||||
os.system("nmcli device wifi rescan")
|
||||
wifi_networks = [x.decode("utf8") for x in subprocess.check_output(["nmcli","dev", "wifi", "list"]).split(b"\n")]
|
||||
wifi_scan = [x for x in wifi_networks if ssid in x]
|
||||
if len(wifi_scan) != 0:
|
||||
break
|
||||
time.sleep(0.1)
|
||||
# MAX_TRIES tries, ~10 seconds max
|
||||
cnt += 1
|
||||
assert cnt < MAX_TRIES
|
||||
if "-pair" in wifi_scan[0]:
|
||||
os.system("nmcli d wifi connect %s-pair" % (ssid))
|
||||
connect_cnt = 0
|
||||
MAX_TRIES = 100
|
||||
while connect_cnt < MAX_TRIES:
|
||||
connect_cnt += 1
|
||||
r = subprocess.call(["ping", "-W", "4", "-c", "1", "192.168.0.10"], stdout=FNULL, stderr=subprocess.STDOUT)
|
||||
if r:
|
||||
print("Waiting for panda to ping...")
|
||||
time.sleep(0.5)
|
||||
else:
|
||||
break
|
||||
if insecure_okay:
|
||||
break
|
||||
# fetch webpage
|
||||
print("connecting to insecure network to secure")
|
||||
try:
|
||||
r = requests.get("http://192.168.0.10/")
|
||||
except requests.ConnectionError:
|
||||
r = requests.get("http://192.168.0.10/")
|
||||
assert r.status_code==200
|
||||
|
||||
print("securing")
|
||||
try:
|
||||
r = requests.get("http://192.168.0.10/secure", timeout=0.01)
|
||||
except requests.exceptions.Timeout:
|
||||
print("timeout http request to secure")
|
||||
pass
|
||||
else:
|
||||
ret = os.system("nmcli d wifi connect %s password %s" % (ssid, pw))
|
||||
if os.WEXITSTATUS(ret) == 0:
|
||||
#check ping too
|
||||
ping_ok = False
|
||||
connect_cnt = 0
|
||||
MAX_TRIES = 10
|
||||
while connect_cnt < MAX_TRIES:
|
||||
connect_cnt += 1
|
||||
r = subprocess.call(["ping", "-W", "4", "-c", "1", "192.168.0.10"], stdout=FNULL, stderr=subprocess.STDOUT)
|
||||
if r:
|
||||
print("Waiting for panda to ping...")
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
ping_ok = True
|
||||
break
|
||||
if ping_ok:
|
||||
break
|
||||
|
||||
# TODO: confirm that it's connected to the right panda
|
||||
|
||||
def time_many_sends(p, bus, precv=None, msg_count=100, msg_id=None, two_pandas=False):
|
||||
if precv == None:
|
||||
precv = p
|
||||
def time_many_sends(p, bus, p_recv=None, msg_count=100, msg_id=None, two_pandas=False):
|
||||
if p_recv == None:
|
||||
p_recv = p
|
||||
if msg_id == None:
|
||||
msg_id = random.randint(0x100, 0x200)
|
||||
if p == precv and two_pandas:
|
||||
if p == p_recv and two_pandas:
|
||||
raise ValueError("Cannot have two pandas that are the same panda")
|
||||
|
||||
st = time.time()
|
||||
start_time = time.time()
|
||||
p.can_send_many([(msg_id, 0, b"\xaa"*8, bus)]*msg_count)
|
||||
r = []
|
||||
r_echo = []
|
||||
r_len_expected = msg_count if two_pandas else msg_count*2
|
||||
r_echo_len_exected = msg_count if two_pandas else 0
|
||||
|
||||
while len(r) < r_len_expected and (time.time() - st) < 5:
|
||||
r.extend(precv.can_recv())
|
||||
et = time.time()
|
||||
while len(r) < r_len_expected and (time.time() - start_time) < 5:
|
||||
r.extend(p_recv.can_recv())
|
||||
end_time = time.time()
|
||||
if two_pandas:
|
||||
while len(r_echo) < r_echo_len_exected and (time.time() - st) < 10:
|
||||
while len(r_echo) < r_echo_len_exected and (time.time() - start_time) < 10:
|
||||
r_echo.extend(p.can_recv())
|
||||
|
||||
sent_echo = [x for x in r if x[3] == 0x80 | bus and x[0] == msg_id]
|
||||
@@ -164,12 +105,17 @@ def time_many_sends(p, bus, precv=None, msg_count=100, msg_id=None, two_pandas=F
|
||||
assert_equal(len(resp), msg_count)
|
||||
assert_equal(len(sent_echo), msg_count)
|
||||
|
||||
et = (et-st)*1000.0
|
||||
comp_kbps = (1+11+1+1+1+4+8*8+15+1+1+1+7)*msg_count / et
|
||||
end_time = (end_time-start_time)*1000.0
|
||||
comp_kbps = (1+11+1+1+1+4+8*8+15+1+1+1+7)*msg_count / end_time
|
||||
|
||||
return comp_kbps
|
||||
|
||||
_panda_serials = None
|
||||
def reset_pandas():
|
||||
panda_jungle.set_panda_power(False)
|
||||
time.sleep(2)
|
||||
panda_jungle.set_panda_power(True)
|
||||
time.sleep(5)
|
||||
|
||||
def panda_type_to_serial(fn):
|
||||
@wraps(fn)
|
||||
def wrapper(panda_type=None, **kwargs):
|
||||
@@ -181,11 +127,7 @@ def panda_type_to_serial(fn):
|
||||
# If not done already, get panda serials and their type
|
||||
global _panda_serials
|
||||
if _panda_serials == None:
|
||||
_panda_serials = []
|
||||
for serial in Panda.list():
|
||||
p = Panda(serial=serial)
|
||||
_panda_serials.append((serial, p.get_type()))
|
||||
p.close()
|
||||
init_panda_serials()
|
||||
|
||||
# Find a panda with the correct types and add the corresponding serial
|
||||
serials = []
|
||||
@@ -202,13 +144,15 @@ def panda_type_to_serial(fn):
|
||||
return fn(serials, **kwargs)
|
||||
return wrapper
|
||||
|
||||
def heartbeat_thread(p):
|
||||
while True:
|
||||
try:
|
||||
p.send_heartbeat()
|
||||
time.sleep(1)
|
||||
except:
|
||||
break
|
||||
def start_heartbeat_thread(p):
|
||||
def heartbeat_thread(p):
|
||||
while True:
|
||||
try:
|
||||
p.send_heartbeat()
|
||||
time.sleep(1)
|
||||
except:
|
||||
break
|
||||
_thread.start_new_thread(heartbeat_thread, (p,))
|
||||
|
||||
def panda_connect_and_init(fn):
|
||||
@wraps(fn)
|
||||
@@ -223,25 +167,38 @@ def panda_connect_and_init(fn):
|
||||
for panda_serial in panda_serials:
|
||||
pandas.append(Panda(serial=panda_serial))
|
||||
|
||||
# Initialize jungle
|
||||
clear_can_buffers(panda_jungle)
|
||||
panda_jungle.set_can_loopback(False)
|
||||
panda_jungle.set_obd(False)
|
||||
panda_jungle.set_harness_orientation(PandaJungle.HARNESS_ORIENTATION_1)
|
||||
for bus, speed in BUS_SPEEDS:
|
||||
panda_jungle.set_can_speed_kbps(bus, speed)
|
||||
|
||||
# Initialize pandas
|
||||
for panda in pandas:
|
||||
panda.set_can_loopback(False)
|
||||
panda.set_gmlan(None)
|
||||
panda.set_esp_power(False)
|
||||
for bus, speed in [(0, SPEED_NORMAL), (1, SPEED_NORMAL), (2, SPEED_NORMAL), (3, SPEED_GMLAN)]:
|
||||
panda.set_power_save(False)
|
||||
for bus, speed in BUS_SPEEDS:
|
||||
panda.set_can_speed_kbps(bus, speed)
|
||||
clear_can_buffers(panda)
|
||||
_thread.start_new_thread(heartbeat_thread, (panda,))
|
||||
panda.set_power_save(False)
|
||||
|
||||
# Run test function
|
||||
ret = fn(*pandas, **kwargs)
|
||||
try:
|
||||
run_with_timeout(TIMEOUT, fn, *pandas, **kwargs)
|
||||
|
||||
# Close all connections
|
||||
for panda in pandas:
|
||||
panda.close()
|
||||
|
||||
# Return test function result
|
||||
return ret
|
||||
# Check if the pandas did not throw any faults while running test
|
||||
for panda in pandas:
|
||||
panda.reconnect()
|
||||
assert panda.health()['fault_status'] == 0
|
||||
except Exception as e:
|
||||
raise e
|
||||
finally:
|
||||
# Close all connections
|
||||
for panda in pandas:
|
||||
panda.close()
|
||||
return wrapper
|
||||
|
||||
def clear_can_buffers(panda):
|
||||
|
||||
25
panda/tests/automated/timeout.py
Normal file
25
panda/tests/automated/timeout.py
Normal file
@@ -0,0 +1,25 @@
|
||||
import time
|
||||
from multiprocessing import Process
|
||||
|
||||
# Note: this does not return any return values of the function, just the exit status
|
||||
INTERVAL = 0.1
|
||||
def run_with_timeout(timeout, fn, *kwargs):
|
||||
def runner(fn, kwargs):
|
||||
try:
|
||||
fn(*kwargs)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
raise e
|
||||
|
||||
process = Process(target=runner, args=(fn, kwargs))
|
||||
process.start()
|
||||
|
||||
counter = 0
|
||||
while process.is_alive():
|
||||
time.sleep(INTERVAL)
|
||||
counter+=1
|
||||
if (counter * INTERVAL) > timeout:
|
||||
process.terminate()
|
||||
raise TimeoutError("Function timed out!")
|
||||
if process.exitcode != 0:
|
||||
raise RuntimeError("Test failed with exit code: ", str(process.exitcode))
|
||||
85
panda/tests/automated/wifi_helpers.py
Normal file
85
panda/tests/automated/wifi_helpers.py
Normal file
@@ -0,0 +1,85 @@
|
||||
import os
|
||||
FNULL = open(os.devnull, 'w')
|
||||
def _connect_wifi(dongle_id, pw, insecure_okay=False):
|
||||
ssid = "panda-" + dongle_id
|
||||
|
||||
r = subprocess.call(["ping", "-W", "4", "-c", "1", "192.168.0.10"], stdout=FNULL, stderr=subprocess.STDOUT)
|
||||
if not r:
|
||||
# Can already ping, try connecting on wifi
|
||||
try:
|
||||
p = Panda("WIFI")
|
||||
p.get_serial()
|
||||
print("Already connected")
|
||||
return
|
||||
except:
|
||||
pass
|
||||
|
||||
print("WIFI: connecting to %s" % ssid)
|
||||
|
||||
while 1:
|
||||
if sys.platform == "darwin":
|
||||
os.system("networksetup -setairportnetwork en0 %s %s" % (ssid, pw))
|
||||
else:
|
||||
wlan_interface = subprocess.check_output(["sh", "-c", "iw dev | awk '/Interface/ {print $2}'"]).strip().decode('utf8')
|
||||
cnt = 0
|
||||
MAX_TRIES = 10
|
||||
while cnt < MAX_TRIES:
|
||||
print("WIFI: scanning %d" % cnt)
|
||||
os.system("iwlist %s scanning > /dev/null" % wlan_interface)
|
||||
os.system("nmcli device wifi rescan")
|
||||
wifi_networks = [x.decode("utf8") for x in subprocess.check_output(["nmcli","dev", "wifi", "list"]).split(b"\n")]
|
||||
wifi_scan = [x for x in wifi_networks if ssid in x]
|
||||
if len(wifi_scan) != 0:
|
||||
break
|
||||
time.sleep(0.1)
|
||||
# MAX_TRIES tries, ~10 seconds max
|
||||
cnt += 1
|
||||
assert cnt < MAX_TRIES
|
||||
if "-pair" in wifi_scan[0]:
|
||||
os.system("nmcli d wifi connect %s-pair" % (ssid))
|
||||
connect_cnt = 0
|
||||
MAX_TRIES = 100
|
||||
while connect_cnt < MAX_TRIES:
|
||||
connect_cnt += 1
|
||||
r = subprocess.call(["ping", "-W", "4", "-c", "1", "192.168.0.10"], stdout=FNULL, stderr=subprocess.STDOUT)
|
||||
if r:
|
||||
print("Waiting for panda to ping...")
|
||||
time.sleep(0.5)
|
||||
else:
|
||||
break
|
||||
if insecure_okay:
|
||||
break
|
||||
# fetch webpage
|
||||
print("connecting to insecure network to secure")
|
||||
try:
|
||||
r = requests.get("http://192.168.0.10/")
|
||||
except requests.ConnectionError:
|
||||
r = requests.get("http://192.168.0.10/")
|
||||
assert r.status_code==200
|
||||
|
||||
print("securing")
|
||||
try:
|
||||
r = requests.get("http://192.168.0.10/secure", timeout=0.01)
|
||||
except requests.exceptions.Timeout:
|
||||
print("timeout http request to secure")
|
||||
pass
|
||||
else:
|
||||
ret = os.system("nmcli d wifi connect %s password %s" % (ssid, pw))
|
||||
if os.WEXITSTATUS(ret) == 0:
|
||||
#check ping too
|
||||
ping_ok = False
|
||||
connect_cnt = 0
|
||||
MAX_TRIES = 10
|
||||
while connect_cnt < MAX_TRIES:
|
||||
connect_cnt += 1
|
||||
r = subprocess.call(["ping", "-W", "4", "-c", "1", "192.168.0.10"], stdout=FNULL, stderr=subprocess.STDOUT)
|
||||
if r:
|
||||
print("Waiting for panda to ping...")
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
ping_ok = True
|
||||
break
|
||||
if ping_ok:
|
||||
break
|
||||
|
||||
# TODO: confirm that it's connected to the right panda
|
||||
@@ -67,7 +67,7 @@ def run_test(sleep_duration):
|
||||
print("Number of cycles:", counter, "Non-zero bus errors:", nonzero_bus_errors, "Zero bus errors:", zero_bus_errors, "Content errors:", content_errors)
|
||||
|
||||
# Toggle relay
|
||||
black_panda.set_safety_mode(Panda.SAFETY_NOOUTPUT)
|
||||
black_panda.set_safety_mode(Panda.SAFETY_SILENT)
|
||||
time.sleep(1)
|
||||
black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
||||
time.sleep(1)
|
||||
|
||||
@@ -72,7 +72,7 @@ def run_test(sleep_duration):
|
||||
|
||||
if (time.time() - temp_start_time) > 3600*6:
|
||||
# Toggle relay
|
||||
black_panda.set_safety_mode(Panda.SAFETY_NOOUTPUT)
|
||||
black_panda.set_safety_mode(Panda.SAFETY_SILENT)
|
||||
time.sleep(1)
|
||||
black_panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
||||
time.sleep(1)
|
||||
|
||||
@@ -74,7 +74,7 @@ def run_test(sleep_duration):
|
||||
assert False
|
||||
|
||||
# Switch off relay
|
||||
black_panda.set_safety_mode(Panda.SAFETY_NOOUTPUT)
|
||||
black_panda.set_safety_mode(Panda.SAFETY_SILENT)
|
||||
time.sleep(0.05)
|
||||
|
||||
if not test_buses(black_panda, other_panda, (0, False, [0, 2])):
|
||||
|
||||
@@ -44,6 +44,6 @@ if __name__ == "__main__":
|
||||
if claim:
|
||||
panda.serial_write(port_number, ln)
|
||||
time.sleep(0.01)
|
||||
except:
|
||||
except Exception:
|
||||
print("panda disconnected!")
|
||||
time.sleep(0.5);
|
||||
|
||||
50
panda/tests/development/register_hashmap_spread.py
Executable file
50
panda/tests/development/register_hashmap_spread.py
Executable file
@@ -0,0 +1,50 @@
|
||||
#!/usr/bin/env python3
|
||||
import matplotlib.pyplot as plt # pylint: disable=import-error
|
||||
|
||||
HASHING_PRIME = 23
|
||||
REGISTER_MAP_SIZE = 0x3FF
|
||||
BYTES_PER_REG = 4
|
||||
|
||||
# From ST32F413 datasheet
|
||||
REGISTER_ADDRESS_REGIONS = [
|
||||
(0x40000000, 0x40007FFF),
|
||||
(0x40010000, 0x400107FF),
|
||||
(0x40011000, 0x400123FF),
|
||||
(0x40012C00, 0x40014BFF),
|
||||
(0x40015000, 0x400153FF),
|
||||
(0x40015800, 0x40015BFF),
|
||||
(0x40016000, 0x400167FF),
|
||||
(0x40020000, 0x40021FFF),
|
||||
(0x40023000, 0x400233FF),
|
||||
(0x40023800, 0x40023FFF),
|
||||
(0x40026000, 0x400267FF),
|
||||
(0x50000000, 0x5003FFFF),
|
||||
(0x50060000, 0x500603FF),
|
||||
(0x50060800, 0x50060BFF),
|
||||
(0x50060800, 0x50060BFF),
|
||||
(0xE0000000, 0xE00FFFFF)
|
||||
]
|
||||
|
||||
def hash(reg_addr):
|
||||
return (((reg_addr >> 16) ^ ((((reg_addr + 1) & 0xFFFF) * HASHING_PRIME) & 0xFFFF)) & REGISTER_MAP_SIZE)
|
||||
|
||||
# Calculate hash for each address
|
||||
hashes = []
|
||||
double_hashes = []
|
||||
for (start_addr, stop_addr) in REGISTER_ADDRESS_REGIONS:
|
||||
for addr in range(start_addr, stop_addr+1, BYTES_PER_REG):
|
||||
h = hash(addr)
|
||||
hashes.append(h)
|
||||
double_hashes.append(hash(h))
|
||||
|
||||
# Make histograms
|
||||
plt.subplot(2, 1, 1)
|
||||
plt.hist(hashes, bins=REGISTER_MAP_SIZE)
|
||||
plt.title("Number of collisions per hash")
|
||||
plt.xlabel("Address")
|
||||
|
||||
plt.subplot(2, 1, 2)
|
||||
plt.hist(double_hashes, bins=REGISTER_MAP_SIZE)
|
||||
plt.title("Number of collisions per double hash")
|
||||
plt.xlabel("Address")
|
||||
plt.show()
|
||||
35
panda/tests/echo.py
Executable file
35
panda/tests/echo.py
Executable file
@@ -0,0 +1,35 @@
|
||||
#!/usr/bin/env python3
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import _thread
|
||||
|
||||
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
|
||||
from panda import Panda
|
||||
|
||||
# This script is intended to be used in conjunction with the echo_loopback_test.py test script from panda jungle.
|
||||
# It sends a reversed response back for every message received containing b"test".
|
||||
|
||||
def heartbeat_thread(p):
|
||||
while True:
|
||||
try:
|
||||
p.send_heartbeat()
|
||||
time.sleep(1)
|
||||
except:
|
||||
break
|
||||
|
||||
# Resend every CAN message that has been received on the same bus, but with the data reversed
|
||||
if __name__ == "__main__":
|
||||
p = Panda()
|
||||
_thread.start_new_thread(heartbeat_thread, (p,))
|
||||
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
||||
p.set_power_save(False)
|
||||
|
||||
while True:
|
||||
incoming = p.can_recv()
|
||||
for message in incoming:
|
||||
address, notused, data, bus = message
|
||||
if b'test' in data:
|
||||
p.can_send(address, data[::-1], bus)
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#!/usr/bin/env python
|
||||
#!/usr/bin/env python3
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
|
||||
1
panda/tests/misra/.gitignore
vendored
Normal file
1
panda/tests/misra/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
cppcheck/
|
||||
@@ -7,7 +7,7 @@
|
||||
2.4 X (Cppcheck)
|
||||
2.5
|
||||
2.6 X (Cppcheck)
|
||||
2.7
|
||||
2.7 X (Addon)
|
||||
3.1 X (Addon)
|
||||
3.2 X (Addon)
|
||||
4.1 X (Addon)
|
||||
|
||||
@@ -4,7 +4,7 @@ mkdir /tmp/misra || true
|
||||
git clone https://github.com/danmar/cppcheck.git || true
|
||||
cd cppcheck
|
||||
git fetch
|
||||
git checkout bdd41151ed550e3d240a6dd6847859216b7ad736
|
||||
git checkout e46191e6e809272d8b34feca8999ee413f716b80
|
||||
make -j4
|
||||
cd ../../../
|
||||
|
||||
|
||||
36
panda/tests/safety/common.py
Normal file
36
panda/tests/safety/common.py
Normal file
@@ -0,0 +1,36 @@
|
||||
from panda.tests.safety import libpandasafety_py
|
||||
|
||||
def make_msg(bus, addr, length=8):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
if addr >= 0x800:
|
||||
to_send[0].RIR = (addr << 3) | 5
|
||||
else:
|
||||
to_send[0].RIR = (addr << 21) | 1
|
||||
to_send[0].RDTR = length
|
||||
to_send[0].RDTR |= bus << 4
|
||||
|
||||
return to_send
|
||||
|
||||
def test_relay_malfunction(test, addr):
|
||||
# input is a test class and the address that, if seen on bus 0, triggers
|
||||
# the relay_malfunction protection logic: both tx_hook and fwd_hook are
|
||||
# expected to return failure
|
||||
test.assertFalse(test.safety.get_relay_malfunction())
|
||||
test.safety.safety_rx_hook(make_msg(0, addr, 8))
|
||||
test.assertTrue(test.safety.get_relay_malfunction())
|
||||
for a in range(1, 0x800):
|
||||
for b in range(0, 3):
|
||||
test.assertFalse(test.safety.safety_tx_hook(make_msg(b, a, 8)))
|
||||
test.assertEqual(-1, test.safety.safety_fwd_hook(b, make_msg(b, a, 8)))
|
||||
|
||||
def test_manually_enable_controls_allowed(test):
|
||||
test.safety.set_controls_allowed(1)
|
||||
test.assertTrue(test.safety.get_controls_allowed())
|
||||
test.safety.set_controls_allowed(0)
|
||||
test.assertFalse(test.safety.get_controls_allowed())
|
||||
|
||||
def test_spam_can_buses(test, TX_MSGS):
|
||||
for addr in range(1, 0x800):
|
||||
for bus in range(0, 4):
|
||||
if all(addr != m[0] or bus != m[1] for m in TX_MSGS):
|
||||
test.assertFalse(test.safety.safety_tx_hook(make_msg(bus, addr, 8)))
|
||||
@@ -34,6 +34,8 @@ bool board_has_relay(void);
|
||||
|
||||
void set_controls_allowed(bool c);
|
||||
bool get_controls_allowed(void);
|
||||
void set_relay_malfunction(bool c);
|
||||
bool get_relay_malfunction(void);
|
||||
void set_long_controls_allowed(bool c);
|
||||
bool get_long_controls_allowed(void);
|
||||
void set_gas_interceptor_detected(bool c);
|
||||
@@ -46,7 +48,7 @@ void reset_angle_control(void);
|
||||
void safety_rx_hook(CAN_FIFOMailBox_TypeDef *to_send);
|
||||
int safety_tx_hook(CAN_FIFOMailBox_TypeDef *to_push);
|
||||
int safety_fwd_hook(int bus_num, CAN_FIFOMailBox_TypeDef *to_fwd);
|
||||
int safety_set_mode(uint16_t mode, int16_t param);
|
||||
int set_safety_hooks(uint16_t mode, int16_t param);
|
||||
|
||||
void init_tests_toyota(void);
|
||||
int get_toyota_torque_meas_min(void);
|
||||
@@ -54,7 +56,6 @@ int get_toyota_torque_meas_max(void);
|
||||
int get_toyota_gas_prev(void);
|
||||
void set_toyota_torque_meas(int min, int max);
|
||||
void set_toyota_desired_torque_last(int t);
|
||||
void set_toyota_camera_forwarded(int t);
|
||||
void set_toyota_rt_torque_last(int t);
|
||||
|
||||
void init_tests_honda(void);
|
||||
@@ -80,13 +81,10 @@ void init_tests_hyundai(void);
|
||||
void set_hyundai_desired_torque_last(int t);
|
||||
void set_hyundai_rt_torque_last(int t);
|
||||
void set_hyundai_torque_driver(int min, int max);
|
||||
void set_hyundai_giraffe_switch_2(int t);
|
||||
void set_hyundai_camera_bus(int t);
|
||||
|
||||
void init_tests_chrysler(void);
|
||||
void set_chrysler_desired_torque_last(int t);
|
||||
void set_chrysler_rt_torque_last(int t);
|
||||
void set_chrysler_camera_detected(int t);
|
||||
int get_chrysler_torque_meas_min(void);
|
||||
int get_chrysler_torque_meas_max(void);
|
||||
void set_chrysler_torque_meas(int min, int max);
|
||||
|
||||
@@ -42,6 +42,8 @@ TIM_TypeDef *TIM2 = &timer;
|
||||
#define HW_TYPE_PEDAL 4U
|
||||
#define HW_TYPE_UNO 5U
|
||||
|
||||
#define ALLOW_DEBUG
|
||||
|
||||
// from main_declarations.h
|
||||
uint8_t hw_type = HW_TYPE_UNKNOWN;
|
||||
|
||||
@@ -80,6 +82,10 @@ void set_controls_allowed(bool c){
|
||||
controls_allowed = c;
|
||||
}
|
||||
|
||||
void set_relay_malfunction(bool c){
|
||||
relay_malfunction = c;
|
||||
}
|
||||
|
||||
void set_long_controls_allowed(bool c){
|
||||
long_controls_allowed = c;
|
||||
}
|
||||
@@ -96,6 +102,10 @@ bool get_controls_allowed(void){
|
||||
return controls_allowed;
|
||||
}
|
||||
|
||||
bool get_relay_malfunction(void){
|
||||
return relay_malfunction;
|
||||
}
|
||||
|
||||
bool get_long_controls_allowed(void){
|
||||
return long_controls_allowed;
|
||||
}
|
||||
@@ -116,10 +126,6 @@ void set_timer(uint32_t t){
|
||||
timer.CNT = t;
|
||||
}
|
||||
|
||||
void set_toyota_camera_forwarded(int t){
|
||||
toyota_camera_forwarded = t;
|
||||
}
|
||||
|
||||
void set_toyota_torque_meas(int min, int max){
|
||||
toyota_torque_meas.min = min;
|
||||
toyota_torque_meas.max = max;
|
||||
@@ -140,18 +146,6 @@ void set_hyundai_torque_driver(int min, int max){
|
||||
hyundai_torque_driver.max = max;
|
||||
}
|
||||
|
||||
void set_hyundai_camera_bus(int t){
|
||||
hyundai_camera_bus = t;
|
||||
}
|
||||
|
||||
void set_hyundai_giraffe_switch_2(int t){
|
||||
hyundai_giraffe_switch_2 = t;
|
||||
}
|
||||
|
||||
void set_chrysler_camera_detected(int t){
|
||||
chrysler_camera_detected = t;
|
||||
}
|
||||
|
||||
void set_chrysler_torque_meas(int min, int max){
|
||||
chrysler_torque_meas.min = min;
|
||||
chrysler_torque_meas.max = max;
|
||||
@@ -278,6 +272,7 @@ void set_honda_fwd_brake(bool c){
|
||||
void init_tests(void){
|
||||
// get HW_TYPE from env variable set in test.sh
|
||||
hw_type = atoi(getenv("HW_TYPE"));
|
||||
safety_mode_cnt = 2U; // avoid ignoring relay_malfunction logic
|
||||
}
|
||||
|
||||
void init_tests_toyota(void){
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
#!/usr/bin/env python3
|
||||
import unittest
|
||||
import numpy as np
|
||||
import libpandasafety_py # pylint: disable=import-error
|
||||
from panda import Panda
|
||||
from panda.tests.safety import libpandasafety_py
|
||||
from panda.tests.safety.common import make_msg, test_manually_enable_controls_allowed, test_spam_can_buses
|
||||
|
||||
|
||||
MAX_RATE_UP = 2
|
||||
MAX_RATE_DOWN = 5
|
||||
@@ -16,6 +18,8 @@ DRIVER_TORQUE_FACTOR = 4;
|
||||
|
||||
IPAS_OVERRIDE_THRESHOLD = 200
|
||||
|
||||
TX_MSGS = [[0x151, 2], [0x152, 0], [0x153, 2], [0x154, 0]]
|
||||
|
||||
def twos_comp(val, bits):
|
||||
if val >= 0:
|
||||
return val
|
||||
@@ -32,59 +36,42 @@ class TestCadillacSafety(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUp(cls):
|
||||
cls.safety = libpandasafety_py.libpandasafety
|
||||
cls.safety.safety_set_mode(Panda.SAFETY_CADILLAC, 0)
|
||||
cls.safety.set_safety_hooks(Panda.SAFETY_CADILLAC, 0)
|
||||
cls.safety.init_tests_cadillac()
|
||||
|
||||
def _send_msg(self, bus, addr, length):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = addr << 21
|
||||
to_send[0].RDTR = length
|
||||
to_send[0].RDTR = bus << 4
|
||||
return to_send
|
||||
|
||||
def _set_prev_torque(self, t):
|
||||
self.safety.set_cadillac_desired_torque_last(t)
|
||||
self.safety.set_cadillac_rt_torque_last(t)
|
||||
|
||||
def _torque_driver_msg(self, torque):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 0x164 << 21
|
||||
|
||||
t = twos_comp(torque, 11)
|
||||
to_send = make_msg(0, 0x164)
|
||||
to_send[0].RDLR = ((t >> 8) & 0x7) | ((t & 0xFF) << 8)
|
||||
return to_send
|
||||
|
||||
def _torque_msg(self, torque):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 0x151 << 21
|
||||
|
||||
to_send = make_msg(2, 0x151)
|
||||
t = twos_comp(torque, 14)
|
||||
to_send[0].RDLR = ((t >> 8) & 0x3F) | ((t & 0xFF) << 8)
|
||||
return to_send
|
||||
|
||||
def test_spam_can_buses(self):
|
||||
test_spam_can_buses(self, TX_MSGS)
|
||||
|
||||
def test_default_controls_not_allowed(self):
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
|
||||
def test_manually_enable_controls_allowed(self):
|
||||
self.safety.set_controls_allowed(1)
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
self.safety.set_controls_allowed(0)
|
||||
test_manually_enable_controls_allowed(self)
|
||||
|
||||
def test_enable_control_allowed_from_cruise(self):
|
||||
to_push = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_push[0].RIR = 0x370 << 21
|
||||
to_push = make_msg(0, 0x370)
|
||||
to_push[0].RDLR = 0x800000
|
||||
to_push[0].RDTR = 0
|
||||
|
||||
self.safety.safety_rx_hook(to_push)
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
|
||||
def test_disable_control_allowed_from_cruise(self):
|
||||
to_push = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_push[0].RIR = 0x370 << 21
|
||||
to_push[0].RDLR = 0
|
||||
to_push[0].RDTR = 0
|
||||
|
||||
to_push = make_msg(0, 0x370)
|
||||
self.safety.set_controls_allowed(1)
|
||||
self.safety.safety_rx_hook(to_push)
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
@@ -190,7 +177,7 @@ class TestCadillacSafety(unittest.TestCase):
|
||||
for b in buss:
|
||||
for m in msgs:
|
||||
# assume len 8
|
||||
self.assertEqual(-1, self.safety.safety_fwd_hook(b, self._send_msg(b, m, 8)))
|
||||
self.assertEqual(-1, self.safety.safety_fwd_hook(b, make_msg(b, m, 8)))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
#!/usr/bin/env python3
|
||||
import unittest
|
||||
import numpy as np
|
||||
import libpandasafety_py # pylint: disable=import-error
|
||||
from panda import Panda
|
||||
from panda.tests.safety import libpandasafety_py
|
||||
from panda.tests.safety.common import test_relay_malfunction, make_msg, test_manually_enable_controls_allowed, test_spam_can_buses
|
||||
|
||||
MAX_RATE_UP = 3
|
||||
MAX_RATE_DOWN = 3
|
||||
@@ -13,40 +14,17 @@ RT_INTERVAL = 250000
|
||||
|
||||
MAX_TORQUE_ERROR = 80
|
||||
|
||||
def twos_comp(val, bits):
|
||||
if val >= 0:
|
||||
return val
|
||||
else:
|
||||
return (2**bits) + val
|
||||
|
||||
def sign(a):
|
||||
if a > 0:
|
||||
return 1
|
||||
else:
|
||||
return -1
|
||||
|
||||
def swap_bytes(data_str):
|
||||
"""Accepts string with hex, returns integer with order swapped for CAN."""
|
||||
a = int(data_str, 16)
|
||||
return ((a & 0xff) << 24) + ((a & 0xff00) << 8) + ((a & 0x00ff0000) >> 8) + ((a & 0xff000000) >> 24)
|
||||
TX_MSGS = [[571, 0], [658, 0], [678, 0]]
|
||||
|
||||
class TestChryslerSafety(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUp(cls):
|
||||
cls.safety = libpandasafety_py.libpandasafety
|
||||
cls.safety.safety_set_mode(Panda.SAFETY_CHRYSLER, 0)
|
||||
cls.safety.set_safety_hooks(Panda.SAFETY_CHRYSLER, 0)
|
||||
cls.safety.init_tests_chrysler()
|
||||
|
||||
def _send_msg(self, bus, addr, length):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = addr << 21
|
||||
to_send[0].RDTR = length
|
||||
to_send[0].RDTR = bus << 4
|
||||
return to_send
|
||||
|
||||
def _button_msg(self, buttons):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 1265 << 21
|
||||
to_send = make_msg(0, 571)
|
||||
to_send[0].RDLR = buttons
|
||||
return to_send
|
||||
|
||||
@@ -56,17 +34,21 @@ class TestChryslerSafety(unittest.TestCase):
|
||||
self.safety.set_chrysler_torque_meas(t, t)
|
||||
|
||||
def _torque_meas_msg(self, torque):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 544 << 21
|
||||
to_send = make_msg(0, 544)
|
||||
to_send[0].RDHR = ((torque + 1024) >> 8) + (((torque + 1024) & 0xff) << 8)
|
||||
return to_send
|
||||
|
||||
def _torque_msg(self, torque):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 0x292 << 21
|
||||
to_send = make_msg(0, 0x292)
|
||||
to_send[0].RDLR = ((torque + 1024) >> 8) + (((torque + 1024) & 0xff) << 8)
|
||||
return to_send
|
||||
|
||||
def test_spam_can_buses(self):
|
||||
test_spam_can_buses(self, TX_MSGS)
|
||||
|
||||
def test_relay_malfunction(self):
|
||||
test_relay_malfunction(self, 0x292)
|
||||
|
||||
def test_default_controls_not_allowed(self):
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
|
||||
@@ -81,22 +63,17 @@ class TestChryslerSafety(unittest.TestCase):
|
||||
self.assertTrue(self.safety.safety_tx_hook(self._torque_msg(t)))
|
||||
|
||||
def test_manually_enable_controls_allowed(self):
|
||||
self.safety.set_controls_allowed(1)
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
self.safety.set_controls_allowed(0)
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
test_manually_enable_controls_allowed(self)
|
||||
|
||||
def test_enable_control_allowed_from_cruise(self):
|
||||
to_push = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_push[0].RIR = 0x1f4 << 21
|
||||
to_push = make_msg(0, 0x1F4)
|
||||
to_push[0].RDLR = 0x380000
|
||||
|
||||
self.safety.safety_rx_hook(to_push)
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
|
||||
def test_disable_control_allowed_from_cruise(self):
|
||||
to_push = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_push[0].RIR = 0x1f4 << 21
|
||||
to_push = make_msg(0, 0x1F4)
|
||||
to_push[0].RDLR = 0
|
||||
|
||||
self.safety.set_controls_allowed(1)
|
||||
@@ -179,28 +156,30 @@ class TestChryslerSafety(unittest.TestCase):
|
||||
self.assertEqual(0, self.safety.get_chrysler_torque_meas_max())
|
||||
self.assertEqual(0, self.safety.get_chrysler_torque_meas_min())
|
||||
|
||||
def test_cancel_button(self):
|
||||
CANCEL = 1
|
||||
for b in range(0, 0xff):
|
||||
if b == CANCEL:
|
||||
self.assertTrue(self.safety.safety_tx_hook(self._button_msg(b)))
|
||||
else:
|
||||
self.assertFalse(self.safety.safety_tx_hook(self._button_msg(b)))
|
||||
|
||||
def test_fwd_hook(self):
|
||||
buss = list(range(0x0, 0x3))
|
||||
msgs = list(range(0x1, 0x800))
|
||||
chrysler_camera_detected = [0, 1]
|
||||
|
||||
for ccd in chrysler_camera_detected:
|
||||
self.safety.set_chrysler_camera_detected(ccd)
|
||||
blocked_msgs = [658, 678]
|
||||
for b in buss:
|
||||
for m in msgs:
|
||||
if not ccd:
|
||||
if b == 0:
|
||||
fwd_bus = 2
|
||||
elif b == 1:
|
||||
fwd_bus = -1
|
||||
elif b == 2:
|
||||
fwd_bus = -1 if m in blocked_msgs else 0
|
||||
else:
|
||||
fwd_bus = -1
|
||||
blocked_msgs = [658, 678]
|
||||
for b in buss:
|
||||
for m in msgs:
|
||||
if b == 0:
|
||||
fwd_bus = 2
|
||||
elif b == 1:
|
||||
fwd_bus = -1
|
||||
elif b == 2:
|
||||
fwd_bus = -1 if m in blocked_msgs else 0
|
||||
|
||||
# assume len 8
|
||||
self.assertEqual(fwd_bus, self.safety.safety_fwd_hook(b, self._send_msg(b, m, 8)))
|
||||
# assume len 8
|
||||
self.assertEqual(fwd_bus, self.safety.safety_fwd_hook(b, make_msg(b, m, 8)))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
#!/usr/bin/env python3
|
||||
import unittest
|
||||
import numpy as np
|
||||
import libpandasafety_py # pylint: disable=import-error
|
||||
from panda import Panda
|
||||
from panda.tests.safety import libpandasafety_py
|
||||
from panda.tests.safety.common import test_relay_malfunction, make_msg, test_manually_enable_controls_allowed, test_spam_can_buses
|
||||
|
||||
MAX_RATE_UP = 7
|
||||
MAX_RATE_DOWN = 17
|
||||
@@ -17,6 +18,11 @@ RT_INTERVAL = 250000
|
||||
DRIVER_TORQUE_ALLOWANCE = 50;
|
||||
DRIVER_TORQUE_FACTOR = 4;
|
||||
|
||||
TX_MSGS = [[384, 0], [1033, 0], [1034, 0], [715, 0], [880, 0], # pt bus
|
||||
[161, 1], [774, 1], [776, 1], [784, 1], # obs bus
|
||||
[789, 2], # ch bus
|
||||
[0x104c006c, 3], [0x10400060]] # gmlan
|
||||
|
||||
def twos_comp(val, bits):
|
||||
if val >= 0:
|
||||
return val
|
||||
@@ -33,50 +39,37 @@ class TestGmSafety(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUp(cls):
|
||||
cls.safety = libpandasafety_py.libpandasafety
|
||||
cls.safety.safety_set_mode(Panda.SAFETY_GM, 0)
|
||||
cls.safety.set_safety_hooks(Panda.SAFETY_GM, 0)
|
||||
cls.safety.init_tests_gm()
|
||||
|
||||
def _send_msg(self, bus, addr, length):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = addr << 21
|
||||
to_send[0].RDTR = length
|
||||
to_send[0].RDTR = bus << 4
|
||||
return to_send
|
||||
|
||||
def _speed_msg(self, speed):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 842 << 21
|
||||
to_send = make_msg(0, 842)
|
||||
to_send[0].RDLR = speed
|
||||
return to_send
|
||||
|
||||
def _button_msg(self, buttons):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 481 << 21
|
||||
to_send = make_msg(0, 481)
|
||||
to_send[0].RDHR = buttons << 12
|
||||
return to_send
|
||||
|
||||
def _brake_msg(self, brake):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 241 << 21
|
||||
to_send = make_msg(0, 241)
|
||||
to_send[0].RDLR = 0xa00 if brake else 0x900
|
||||
return to_send
|
||||
|
||||
def _gas_msg(self, gas):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 417 << 21
|
||||
to_send = make_msg(0, 417)
|
||||
to_send[0].RDHR = (1 << 16) if gas else 0
|
||||
return to_send
|
||||
|
||||
def _send_brake_msg(self, brake):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 789 << 21
|
||||
to_send = make_msg(2, 789)
|
||||
brake = (-brake) & 0xfff
|
||||
to_send[0].RDLR = (brake >> 8) | ((brake &0xff) << 8)
|
||||
return to_send
|
||||
|
||||
def _send_gas_msg(self, gas):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 715 << 21
|
||||
to_send = make_msg(0, 715)
|
||||
to_send[0].RDLR = ((gas & 0x1f) << 27) | ((gas & 0xfe0) << 11)
|
||||
return to_send
|
||||
|
||||
@@ -85,21 +78,23 @@ class TestGmSafety(unittest.TestCase):
|
||||
self.safety.set_gm_rt_torque_last(t)
|
||||
|
||||
def _torque_driver_msg(self, torque):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 388 << 21
|
||||
|
||||
t = twos_comp(torque, 11)
|
||||
to_send = make_msg(0, 388)
|
||||
to_send[0].RDHR = (((t >> 8) & 0x7) << 16) | ((t & 0xFF) << 24)
|
||||
return to_send
|
||||
|
||||
def _torque_msg(self, torque):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 384 << 21
|
||||
|
||||
t = twos_comp(torque, 11)
|
||||
to_send = make_msg(0, 384)
|
||||
to_send[0].RDLR = ((t >> 8) & 0x7) | ((t & 0xFF) << 8)
|
||||
return to_send
|
||||
|
||||
def test_spam_can_buses(self):
|
||||
test_spam_can_buses(self, TX_MSGS)
|
||||
|
||||
def test_relay_malfunction(self):
|
||||
test_relay_malfunction(self, 384)
|
||||
|
||||
def test_default_controls_not_allowed(self):
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
|
||||
@@ -198,10 +193,7 @@ class TestGmSafety(unittest.TestCase):
|
||||
self.assertTrue(self.safety.safety_tx_hook(self._torque_msg(t)))
|
||||
|
||||
def test_manually_enable_controls_allowed(self):
|
||||
self.safety.set_controls_allowed(1)
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
self.safety.set_controls_allowed(0)
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
test_manually_enable_controls_allowed(self)
|
||||
|
||||
def test_non_realtime_limit_up(self):
|
||||
self.safety.set_gm_torque_driver(0, 0)
|
||||
@@ -289,7 +281,7 @@ class TestGmSafety(unittest.TestCase):
|
||||
for b in buss:
|
||||
for m in msgs:
|
||||
# assume len 8
|
||||
self.assertEqual(-1, self.safety.safety_fwd_hook(b, self._send_msg(b, m, 8)))
|
||||
self.assertEqual(-1, self.safety.safety_fwd_hook(b, make_msg(b, m, 8)))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,94 +1,79 @@
|
||||
#!/usr/bin/env python3
|
||||
import unittest
|
||||
import numpy as np
|
||||
import libpandasafety_py # pylint: disable=import-error
|
||||
from panda import Panda
|
||||
from panda.tests.safety import libpandasafety_py
|
||||
from panda.tests.safety.common import test_relay_malfunction, make_msg, test_manually_enable_controls_allowed, test_spam_can_buses
|
||||
|
||||
MAX_BRAKE = 255
|
||||
|
||||
INTERCEPTOR_THRESHOLD = 328
|
||||
TX_MSGS = [[0xE4, 0], [0x194, 0], [0x1FA, 0], [0x200, 0], [0x30C, 0], [0x33D, 0], [0x39F, 0]]
|
||||
|
||||
class TestHondaSafety(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUp(cls):
|
||||
cls.safety = libpandasafety_py.libpandasafety
|
||||
cls.safety.safety_set_mode(Panda.SAFETY_HONDA, 0)
|
||||
cls.safety.set_safety_hooks(Panda.SAFETY_HONDA, 0)
|
||||
cls.safety.init_tests_honda()
|
||||
|
||||
def _send_msg(self, bus, addr, length):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = addr << 21
|
||||
to_send[0].RDTR = length
|
||||
to_send[0].RDTR = bus << 4
|
||||
|
||||
return to_send
|
||||
|
||||
def _speed_msg(self, speed):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 0x158 << 21
|
||||
to_send = make_msg(0, 0x158)
|
||||
to_send[0].RDLR = speed
|
||||
|
||||
return to_send
|
||||
|
||||
def _button_msg(self, buttons, msg):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = msg << 21
|
||||
to_send[0].RDLR = buttons << 5
|
||||
has_relay = self.safety.board_has_relay()
|
||||
honda_bosch_hardware = self.safety.get_honda_bosch_hardware()
|
||||
bus = 1 if has_relay and honda_bosch_hardware else 0
|
||||
to_send[0].RDTR = bus << 4
|
||||
|
||||
to_send = make_msg(bus, msg)
|
||||
to_send[0].RDLR = buttons << 5
|
||||
return to_send
|
||||
|
||||
def _brake_msg(self, brake):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 0x17C << 21
|
||||
to_send = make_msg(0, 0x17C)
|
||||
to_send[0].RDHR = 0x200000 if brake else 0
|
||||
|
||||
return to_send
|
||||
|
||||
def _alt_brake_msg(self, brake):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 0x1BE << 21
|
||||
to_send = make_msg(0, 0x1BE)
|
||||
to_send[0].RDLR = 0x10 if brake else 0
|
||||
|
||||
return to_send
|
||||
|
||||
def _gas_msg(self, gas):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 0x17C << 21
|
||||
to_send = make_msg(0, 0x17C)
|
||||
to_send[0].RDLR = 1 if gas else 0
|
||||
|
||||
return to_send
|
||||
|
||||
def _send_brake_msg(self, brake):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 0x1FA << 21
|
||||
to_send = make_msg(0, 0x1FA)
|
||||
to_send[0].RDLR = ((brake & 0x3) << 14) | ((brake & 0x3FF) >> 2)
|
||||
|
||||
return to_send
|
||||
|
||||
def _send_interceptor_msg(self, gas, addr):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = addr << 21
|
||||
to_send[0].RDTR = 6
|
||||
to_send = make_msg(0, addr, 6)
|
||||
gas2 = gas * 2
|
||||
to_send[0].RDLR = ((gas & 0xff) << 8) | ((gas & 0xff00) >> 8) | \
|
||||
((gas2 & 0xff) << 24) | ((gas2 & 0xff00) << 8)
|
||||
|
||||
return to_send
|
||||
|
||||
def _send_steer_msg(self, steer):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 0xE4 << 21
|
||||
to_send = make_msg(0, 0xE4, 6)
|
||||
to_send[0].RDLR = steer
|
||||
|
||||
return to_send
|
||||
|
||||
def test_spam_can_buses(self):
|
||||
test_spam_can_buses(self, TX_MSGS)
|
||||
|
||||
def test_relay_malfunction(self):
|
||||
test_relay_malfunction(self, 0xE4)
|
||||
|
||||
def test_default_controls_not_allowed(self):
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
|
||||
def test_manually_enable_controls_allowed(self):
|
||||
test_manually_enable_controls_allowed(self)
|
||||
|
||||
def test_resume_button(self):
|
||||
RESUME_BTN = 4
|
||||
self.safety.set_controls_allowed(0)
|
||||
@@ -281,7 +266,7 @@ class TestHondaSafety(unittest.TestCase):
|
||||
fwd_bus = -1 if m in blocked_msgs else 0
|
||||
|
||||
# assume len 8
|
||||
self.assertEqual(fwd_bus, self.safety.safety_fwd_hook(b, self._send_msg(b, m, 8)))
|
||||
self.assertEqual(fwd_bus, self.safety.safety_fwd_hook(b, make_msg(b, m, 8)))
|
||||
|
||||
self.safety.set_long_controls_allowed(True)
|
||||
self.safety.set_honda_fwd_brake(False)
|
||||
|
||||
@@ -1,29 +1,31 @@
|
||||
#!/usr/bin/env python3
|
||||
import unittest
|
||||
import libpandasafety_py # pylint: disable=import-error
|
||||
from panda import Panda
|
||||
from panda.tests.safety import libpandasafety_py
|
||||
from panda.tests.safety.common import make_msg, test_spam_can_buses
|
||||
|
||||
MAX_BRAKE = 255
|
||||
|
||||
H_TX_MSGS = [[0xE4, 0], [0x296, 1], [0x33D, 0]] # Bosch Harness
|
||||
G_TX_MSGS = [[0xE4, 2], [0x296, 0], [0x33D, 2]] # Bosch Giraffe
|
||||
|
||||
|
||||
class TestHondaSafety(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUp(cls):
|
||||
cls.safety = libpandasafety_py.libpandasafety
|
||||
cls.safety.safety_set_mode(Panda.SAFETY_HONDA_BOSCH, 0)
|
||||
cls.safety.set_safety_hooks(Panda.SAFETY_HONDA_BOSCH, 0)
|
||||
cls.safety.init_tests_honda()
|
||||
|
||||
def _send_msg(self, bus, addr, length):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = addr << 21
|
||||
to_send[0].RDTR = length
|
||||
to_send[0].RDTR = bus << 4
|
||||
|
||||
return to_send
|
||||
def test_spam_can_buses(self):
|
||||
if self.safety.board_has_relay():
|
||||
test_spam_can_buses(self, H_TX_MSGS)
|
||||
else:
|
||||
test_spam_can_buses(self, G_TX_MSGS)
|
||||
|
||||
def test_fwd_hook(self):
|
||||
buss = range(0x0, 0x3)
|
||||
msgs = range(0x1, 0x800)
|
||||
#has_relay = self.safety.get_hw_type() == 3 # black panda
|
||||
has_relay = self.safety.board_has_relay()
|
||||
bus_rdr_cam = 2 if has_relay else 1
|
||||
bus_rdr_car = 0 if has_relay else 2
|
||||
@@ -40,7 +42,7 @@ class TestHondaSafety(unittest.TestCase):
|
||||
fwd_bus = bus_rdr_cam
|
||||
|
||||
# assume len 8
|
||||
self.assertEqual(fwd_bus, self.safety.safety_fwd_hook(b, self._send_msg(b, m, 8)))
|
||||
self.assertEqual(fwd_bus, self.safety.safety_fwd_hook(b, make_msg(b, m, 8)))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
#!/usr/bin/env python3
|
||||
import unittest
|
||||
import numpy as np
|
||||
import libpandasafety_py # pylint: disable=import-error
|
||||
from panda import Panda
|
||||
from panda.tests.safety import libpandasafety_py
|
||||
from panda.tests.safety.common import test_relay_malfunction, make_msg, test_manually_enable_controls_allowed, test_spam_can_buses
|
||||
|
||||
MAX_RATE_UP = 3
|
||||
MAX_RATE_DOWN = 7
|
||||
@@ -14,6 +15,8 @@ RT_INTERVAL = 250000
|
||||
DRIVER_TORQUE_ALLOWANCE = 50;
|
||||
DRIVER_TORQUE_FACTOR = 2;
|
||||
|
||||
TX_MSGS = [[832, 0], [1265, 0]]
|
||||
|
||||
def twos_comp(val, bits):
|
||||
if val >= 0:
|
||||
return val
|
||||
@@ -30,19 +33,11 @@ class TestHyundaiSafety(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUp(cls):
|
||||
cls.safety = libpandasafety_py.libpandasafety
|
||||
cls.safety.safety_set_mode(Panda.SAFETY_HYUNDAI, 0)
|
||||
cls.safety.set_safety_hooks(Panda.SAFETY_HYUNDAI, 0)
|
||||
cls.safety.init_tests_hyundai()
|
||||
|
||||
def _send_msg(self, bus, addr, length):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = addr << 21
|
||||
to_send[0].RDTR = length
|
||||
to_send[0].RDTR = bus << 4
|
||||
return to_send
|
||||
|
||||
def _button_msg(self, buttons):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 1265 << 21
|
||||
to_send = make_msg(0, 1265)
|
||||
to_send[0].RDLR = buttons
|
||||
return to_send
|
||||
|
||||
@@ -51,17 +46,21 @@ class TestHyundaiSafety(unittest.TestCase):
|
||||
self.safety.set_hyundai_rt_torque_last(t)
|
||||
|
||||
def _torque_driver_msg(self, torque):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 897 << 21
|
||||
to_send = make_msg(0, 897)
|
||||
to_send[0].RDLR = (torque + 2048) << 11
|
||||
return to_send
|
||||
|
||||
def _torque_msg(self, torque):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 832 << 21
|
||||
to_send = make_msg(0, 832)
|
||||
to_send[0].RDLR = (torque + 1024) << 16
|
||||
return to_send
|
||||
|
||||
def test_spam_can_buses(self):
|
||||
test_spam_can_buses(self, TX_MSGS)
|
||||
|
||||
def test_relay_malfunction(self):
|
||||
test_relay_malfunction(self, 832)
|
||||
|
||||
def test_default_controls_not_allowed(self):
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
|
||||
@@ -76,24 +75,16 @@ class TestHyundaiSafety(unittest.TestCase):
|
||||
self.assertTrue(self.safety.safety_tx_hook(self._torque_msg(t)))
|
||||
|
||||
def test_manually_enable_controls_allowed(self):
|
||||
self.safety.set_controls_allowed(1)
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
self.safety.set_controls_allowed(0)
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
test_manually_enable_controls_allowed(self)
|
||||
|
||||
def test_enable_control_allowed_from_cruise(self):
|
||||
to_push = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_push[0].RIR = 1057 << 21
|
||||
to_push = make_msg(0, 1057)
|
||||
to_push[0].RDLR = 1 << 13
|
||||
|
||||
self.safety.safety_rx_hook(to_push)
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
|
||||
def test_disable_control_allowed_from_cruise(self):
|
||||
to_push = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_push[0].RIR = 1057 << 21
|
||||
to_push[0].RDLR = 0
|
||||
|
||||
to_push = make_msg(0, 1057)
|
||||
self.safety.set_controls_allowed(1)
|
||||
self.safety.safety_rx_hook(to_push)
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
@@ -176,43 +167,35 @@ class TestHyundaiSafety(unittest.TestCase):
|
||||
self.assertTrue(self.safety.safety_tx_hook(self._torque_msg(sign * (MAX_RT_DELTA + 1))))
|
||||
|
||||
|
||||
#def test_spam_cancel_safety_check(self):
|
||||
# RESUME_BTN = 1
|
||||
# SET_BTN = 2
|
||||
# CANCEL_BTN = 4
|
||||
# BUTTON_MSG = 1265
|
||||
# self.safety.set_controls_allowed(0)
|
||||
# self.assertTrue(self.safety.safety_tx_hook(self._button_msg(CANCEL_BTN)))
|
||||
# self.assertFalse(self.safety.safety_tx_hook(self._button_msg(RESUME_BTN)))
|
||||
# self.assertFalse(self.safety.safety_tx_hook(self._button_msg(SET_BTN)))
|
||||
# # do not block resume if we are engaged already
|
||||
# self.safety.set_controls_allowed(1)
|
||||
# self.assertTrue(self.safety.safety_tx_hook(self._button_msg(RESUME_BTN)))
|
||||
def test_spam_cancel_safety_check(self):
|
||||
RESUME_BTN = 1
|
||||
SET_BTN = 2
|
||||
CANCEL_BTN = 4
|
||||
self.safety.set_controls_allowed(0)
|
||||
self.assertTrue(self.safety.safety_tx_hook(self._button_msg(CANCEL_BTN)))
|
||||
self.assertFalse(self.safety.safety_tx_hook(self._button_msg(RESUME_BTN)))
|
||||
self.assertFalse(self.safety.safety_tx_hook(self._button_msg(SET_BTN)))
|
||||
# do not block resume if we are engaged already
|
||||
self.safety.set_controls_allowed(1)
|
||||
self.assertTrue(self.safety.safety_tx_hook(self._button_msg(RESUME_BTN)))
|
||||
|
||||
def test_fwd_hook(self):
|
||||
|
||||
buss = list(range(0x0, 0x3))
|
||||
msgs = list(range(0x1, 0x800))
|
||||
hyundai_giraffe_switch_2 = [0, 1]
|
||||
|
||||
self.safety.set_hyundai_camera_bus(2)
|
||||
for hgs in hyundai_giraffe_switch_2:
|
||||
self.safety.set_hyundai_giraffe_switch_2(hgs)
|
||||
blocked_msgs = [832]
|
||||
for b in buss:
|
||||
for m in msgs:
|
||||
if hgs:
|
||||
if b == 0:
|
||||
fwd_bus = 2
|
||||
elif b == 1:
|
||||
fwd_bus = -1
|
||||
elif b == 2:
|
||||
fwd_bus = -1 if m in blocked_msgs else 0
|
||||
else:
|
||||
fwd_bus = -1
|
||||
blocked_msgs = [832]
|
||||
for b in buss:
|
||||
for m in msgs:
|
||||
if b == 0:
|
||||
fwd_bus = 2
|
||||
elif b == 1:
|
||||
fwd_bus = -1
|
||||
elif b == 2:
|
||||
fwd_bus = -1 if m in blocked_msgs else 0
|
||||
|
||||
# assume len 8
|
||||
self.assertEqual(fwd_bus, self.safety.safety_fwd_hook(b, self._send_msg(b, m, 8)))
|
||||
# assume len 8
|
||||
self.assertEqual(fwd_bus, self.safety.safety_fwd_hook(b, make_msg(b, m, 8)))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
#!/usr/bin/env python3
|
||||
import unittest
|
||||
import numpy as np
|
||||
import libpandasafety_py # pylint: disable=import-error
|
||||
from panda import Panda
|
||||
from panda.tests.safety import libpandasafety_py
|
||||
from panda.tests.safety.common import test_relay_malfunction, make_msg, test_manually_enable_controls_allowed, test_spam_can_buses
|
||||
|
||||
MAX_RATE_UP = 50
|
||||
MAX_RATE_DOWN = 70
|
||||
@@ -14,6 +15,8 @@ RT_INTERVAL = 250000
|
||||
DRIVER_TORQUE_ALLOWANCE = 60;
|
||||
DRIVER_TORQUE_FACTOR = 10;
|
||||
|
||||
TX_MSGS = [[0x122, 0], [0x164, 0], [0x221, 0], [0x322, 0]]
|
||||
|
||||
def twos_comp(val, bits):
|
||||
if val >= 0:
|
||||
return val
|
||||
@@ -30,52 +33,43 @@ class TestSubaruSafety(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUp(cls):
|
||||
cls.safety = libpandasafety_py.libpandasafety
|
||||
cls.safety.safety_set_mode(Panda.SAFETY_SUBARU, 0)
|
||||
cls.safety.set_safety_hooks(Panda.SAFETY_SUBARU, 0)
|
||||
cls.safety.init_tests_subaru()
|
||||
|
||||
def _send_msg(self, bus, addr, length):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = addr << 21
|
||||
to_send[0].RDTR = length
|
||||
to_send[0].RDTR = bus << 4
|
||||
return to_send
|
||||
|
||||
def _set_prev_torque(self, t):
|
||||
self.safety.set_subaru_desired_torque_last(t)
|
||||
self.safety.set_subaru_rt_torque_last(t)
|
||||
|
||||
def _torque_driver_msg(self, torque):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 0x119 << 21
|
||||
|
||||
t = twos_comp(torque, 11)
|
||||
to_send = make_msg(0, 0x119)
|
||||
to_send[0].RDLR = ((t & 0x7FF) << 16)
|
||||
return to_send
|
||||
|
||||
def _torque_msg(self, torque):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 0x122 << 21
|
||||
|
||||
to_send = make_msg(0, 0x122)
|
||||
t = twos_comp(torque, 13)
|
||||
to_send[0].RDLR = (t << 16)
|
||||
return to_send
|
||||
|
||||
def test_spam_can_buses(self):
|
||||
test_spam_can_buses(self, TX_MSGS)
|
||||
|
||||
def test_relay_malfunction(self):
|
||||
test_relay_malfunction(self, 0x122)
|
||||
|
||||
def test_default_controls_not_allowed(self):
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
|
||||
def test_enable_control_allowed_from_cruise(self):
|
||||
to_push = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_push[0].RIR = 0x240 << 21
|
||||
to_push = make_msg(0, 0x240)
|
||||
to_push[0].RDHR = 1 << 9
|
||||
|
||||
self.safety.safety_rx_hook(to_push)
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
|
||||
def test_disable_control_allowed_from_cruise(self):
|
||||
to_push = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_push[0].RIR = 0x240 << 21
|
||||
to_push = make_msg(0, 0x240)
|
||||
to_push[0].RDHR = 0
|
||||
|
||||
self.safety.set_controls_allowed(1)
|
||||
self.safety.safety_rx_hook(to_push)
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
@@ -91,10 +85,7 @@ class TestSubaruSafety(unittest.TestCase):
|
||||
self.assertTrue(self.safety.safety_tx_hook(self._torque_msg(t)))
|
||||
|
||||
def test_manually_enable_controls_allowed(self):
|
||||
self.safety.set_controls_allowed(1)
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
self.safety.set_controls_allowed(0)
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
test_manually_enable_controls_allowed(self)
|
||||
|
||||
def test_non_realtime_limit_up(self):
|
||||
self.safety.set_subaru_torque_driver(0, 0)
|
||||
@@ -188,7 +179,7 @@ class TestSubaruSafety(unittest.TestCase):
|
||||
fwd_bus = -1 if m in blocked_msgs else 0
|
||||
|
||||
# assume len 8
|
||||
self.assertEqual(fwd_bus, self.safety.safety_fwd_hook(b, self._send_msg(b, m, 8)))
|
||||
self.assertEqual(fwd_bus, self.safety.safety_fwd_hook(b, make_msg(b, m, 8)))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
#!/usr/bin/env python3
|
||||
import unittest
|
||||
import numpy as np
|
||||
import libpandasafety_py # pylint: disable=import-error
|
||||
from panda import Panda
|
||||
from panda.tests.safety import libpandasafety_py
|
||||
from panda.tests.safety.common import test_relay_malfunction, make_msg, test_manually_enable_controls_allowed, test_spam_can_buses
|
||||
|
||||
MAX_RATE_UP = 10
|
||||
MAX_RATE_DOWN = 25
|
||||
@@ -15,9 +16,14 @@ MAX_RT_DELTA = 375
|
||||
RT_INTERVAL = 250000
|
||||
|
||||
MAX_TORQUE_ERROR = 350
|
||||
|
||||
INTERCEPTOR_THRESHOLD = 475
|
||||
|
||||
TX_MSGS = [[0x283, 0], [0x2E6, 0], [0x2E7, 0], [0x33E, 0], [0x344, 0], [0x365, 0], [0x366, 0], [0x4CB, 0], # DSU bus 0
|
||||
[0x128, 1], [0x141, 1], [0x160, 1], [0x161, 1], [0x470, 1], # DSU bus 1
|
||||
[0x2E4, 0], [0x411, 0], [0x412, 0], [0x343, 0], [0x1D2, 0], # LKAS + ACC
|
||||
[0x200, 0]]; # interceptor
|
||||
|
||||
|
||||
def twos_comp(val, bits):
|
||||
if val >= 0:
|
||||
return val
|
||||
@@ -34,75 +40,60 @@ class TestToyotaSafety(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUp(cls):
|
||||
cls.safety = libpandasafety_py.libpandasafety
|
||||
cls.safety.safety_set_mode(Panda.SAFETY_TOYOTA, 100)
|
||||
cls.safety.set_safety_hooks(Panda.SAFETY_TOYOTA, 100)
|
||||
cls.safety.init_tests_toyota()
|
||||
|
||||
def _send_msg(self, bus, addr, length):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = addr << 21
|
||||
to_send[0].RDTR = length
|
||||
to_send[0].RDTR = bus << 4
|
||||
return to_send
|
||||
|
||||
def _set_prev_torque(self, t):
|
||||
self.safety.set_toyota_desired_torque_last(t)
|
||||
self.safety.set_toyota_rt_torque_last(t)
|
||||
self.safety.set_toyota_torque_meas(t, t)
|
||||
|
||||
def _torque_meas_msg(self, torque):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 0x260 << 21
|
||||
|
||||
t = twos_comp(torque, 16)
|
||||
to_send = make_msg(0, 0x260)
|
||||
to_send[0].RDHR = t | ((t & 0xFF) << 16)
|
||||
return to_send
|
||||
|
||||
def _torque_msg(self, torque):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 0x2E4 << 21
|
||||
|
||||
t = twos_comp(torque, 16)
|
||||
to_send = make_msg(0, 0x2E4)
|
||||
to_send[0].RDLR = t | ((t & 0xFF) << 16)
|
||||
return to_send
|
||||
|
||||
def _accel_msg(self, accel):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 0x343 << 21
|
||||
|
||||
to_send = make_msg(0, 0x343)
|
||||
a = twos_comp(accel, 16)
|
||||
to_send[0].RDLR = (a & 0xFF) << 8 | (a >> 8)
|
||||
return to_send
|
||||
|
||||
def _send_gas_msg(self, gas):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 0x2C1 << 21
|
||||
to_send = make_msg(0, 0x2C1)
|
||||
to_send[0].RDHR = (gas & 0xFF) << 16
|
||||
|
||||
return to_send
|
||||
|
||||
def _send_interceptor_msg(self, gas, addr):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = addr << 21
|
||||
to_send[0].RDTR = 6
|
||||
gas2 = gas * 2
|
||||
to_send = make_msg(0, addr, 6)
|
||||
to_send[0].RDLR = ((gas & 0xff) << 8) | ((gas & 0xff00) >> 8) | \
|
||||
((gas2 & 0xff) << 24) | ((gas2 & 0xff00) << 8)
|
||||
|
||||
return to_send
|
||||
|
||||
def _pcm_cruise_msg(self, cruise_on):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 0x1D2 << 21
|
||||
to_send = make_msg(0, 0x1D2)
|
||||
to_send[0].RDLR = cruise_on << 5
|
||||
|
||||
return to_send
|
||||
|
||||
def test_spam_can_buses(self):
|
||||
test_spam_can_buses(self, TX_MSGS)
|
||||
|
||||
def test_relay_malfunction(self):
|
||||
test_relay_malfunction(self, 0x2E4)
|
||||
|
||||
def test_default_controls_not_allowed(self):
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
|
||||
def test_manually_enable_controls_allowed(self):
|
||||
self.safety.set_controls_allowed(1)
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
test_manually_enable_controls_allowed(self)
|
||||
|
||||
def test_enable_control_allowed_from_cruise(self):
|
||||
self.safety.safety_rx_hook(self._pcm_cruise_msg(False))
|
||||
@@ -285,29 +276,23 @@ class TestToyotaSafety(unittest.TestCase):
|
||||
buss = list(range(0x0, 0x3))
|
||||
msgs = list(range(0x1, 0x800))
|
||||
long_controls_allowed = [0, 1]
|
||||
toyota_camera_forwarded = [0, 1]
|
||||
|
||||
for tcf in toyota_camera_forwarded:
|
||||
self.safety.set_toyota_camera_forwarded(tcf)
|
||||
for lca in long_controls_allowed:
|
||||
self.safety.set_long_controls_allowed(lca)
|
||||
blocked_msgs = [0x2E4, 0x412, 0x191]
|
||||
if lca:
|
||||
blocked_msgs += [0x343]
|
||||
for b in buss:
|
||||
for m in msgs:
|
||||
if tcf:
|
||||
if b == 0:
|
||||
fwd_bus = 2
|
||||
elif b == 1:
|
||||
fwd_bus = -1
|
||||
elif b == 2:
|
||||
fwd_bus = -1 if m in blocked_msgs else 0
|
||||
else:
|
||||
fwd_bus = -1
|
||||
for lca in long_controls_allowed:
|
||||
self.safety.set_long_controls_allowed(lca)
|
||||
blocked_msgs = [0x2E4, 0x412, 0x191]
|
||||
if lca:
|
||||
blocked_msgs += [0x343]
|
||||
for b in buss:
|
||||
for m in msgs:
|
||||
if b == 0:
|
||||
fwd_bus = 2
|
||||
elif b == 1:
|
||||
fwd_bus = -1
|
||||
elif b == 2:
|
||||
fwd_bus = -1 if m in blocked_msgs else 0
|
||||
|
||||
# assume len 8
|
||||
self.assertEqual(fwd_bus, self.safety.safety_fwd_hook(b, self._send_msg(b, m, 8)))
|
||||
# assume len 8
|
||||
self.assertEqual(fwd_bus, self.safety.safety_fwd_hook(b, make_msg(b, m, 8)))
|
||||
|
||||
self.safety.set_long_controls_allowed(True)
|
||||
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
#!/usr/bin/env python3
|
||||
import unittest
|
||||
import numpy as np
|
||||
import libpandasafety_py # pylint: disable=import-error
|
||||
from panda import Panda
|
||||
from panda.tests.safety import libpandasafety_py
|
||||
from panda.tests. safety.common import make_msg
|
||||
|
||||
IPAS_OVERRIDE_THRESHOLD = 200
|
||||
|
||||
@@ -26,13 +27,11 @@ class TestToyotaSafety(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUp(cls):
|
||||
cls.safety = libpandasafety_py.libpandasafety
|
||||
cls.safety.safety_set_mode(Panda.SAFETY_TOYOTA_IPAS, 66)
|
||||
cls.safety.set_safety_hooks(Panda.SAFETY_TOYOTA_IPAS, 66)
|
||||
cls.safety.init_tests_toyota()
|
||||
|
||||
def _torque_driver_msg(self, torque):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 0x260 << 21
|
||||
|
||||
to_send = make_msg(0, 0x260)
|
||||
t = twos_comp(torque, 16)
|
||||
to_send[0].RDLR = t | ((t & 0xFF) << 16)
|
||||
return to_send
|
||||
@@ -42,9 +41,7 @@ class TestToyotaSafety(unittest.TestCase):
|
||||
self.safety.safety_rx_hook(self._torque_driver_msg(torque))
|
||||
|
||||
def _angle_meas_msg(self, angle):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 0x25 << 21
|
||||
|
||||
to_send = make_msg(0, 0x25)
|
||||
t = twos_comp(angle, 12)
|
||||
to_send[0].RDLR = ((t & 0xF00) >> 8) | ((t & 0xFF) << 8)
|
||||
return to_send
|
||||
@@ -54,17 +51,13 @@ class TestToyotaSafety(unittest.TestCase):
|
||||
self.safety.safety_rx_hook(self._angle_meas_msg(angle))
|
||||
|
||||
def _ipas_state_msg(self, state):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 0x262 << 21
|
||||
|
||||
to_send = make_msg(0, 0x262)
|
||||
to_send[0].RDLR = state & 0xF
|
||||
return to_send
|
||||
|
||||
def _ipas_control_msg(self, angle, state):
|
||||
# note: we command 2/3 of the angle due to CAN conversion
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 0x266 << 21
|
||||
|
||||
to_send = make_msg(0, 0x266)
|
||||
t = twos_comp(angle, 12)
|
||||
to_send[0].RDLR = ((t & 0xF00) >> 8) | ((t & 0xFF) << 8)
|
||||
to_send[0].RDLR |= ((state & 0xf) << 4)
|
||||
@@ -72,8 +65,7 @@ class TestToyotaSafety(unittest.TestCase):
|
||||
return to_send
|
||||
|
||||
def _speed_msg(self, speed):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 0xb4 << 21
|
||||
to_send = make_msg(0, 0xB4)
|
||||
speed = int(speed * 100 * 3.6)
|
||||
|
||||
to_send[0].RDHR = ((speed & 0xFF) << 16) | (speed & 0xFF00)
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
#!/usr/bin/env python3
|
||||
import unittest
|
||||
import numpy as np
|
||||
import libpandasafety_py # pylint: disable=import-error
|
||||
from panda import Panda
|
||||
from panda.tests.safety import libpandasafety_py
|
||||
from panda.tests.safety.common import test_relay_malfunction, make_msg, test_manually_enable_controls_allowed, test_spam_can_buses
|
||||
|
||||
MAX_RATE_UP = 4
|
||||
MAX_RATE_DOWN = 10
|
||||
@@ -14,6 +15,8 @@ RT_INTERVAL = 250000
|
||||
DRIVER_TORQUE_ALLOWANCE = 80
|
||||
DRIVER_TORQUE_FACTOR = 3
|
||||
|
||||
TX_MSGS = [[0x126, 0], [0x12B, 0], [0x12B, 2], [0x397, 0]]
|
||||
|
||||
def sign(a):
|
||||
if a > 0:
|
||||
return 1
|
||||
@@ -24,24 +27,15 @@ class TestVolkswagenSafety(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUp(cls):
|
||||
cls.safety = libpandasafety_py.libpandasafety
|
||||
cls.safety.safety_set_mode(Panda.SAFETY_VOLKSWAGEN, 0)
|
||||
cls.safety.set_safety_hooks(Panda.SAFETY_VOLKSWAGEN, 0)
|
||||
cls.safety.init_tests_volkswagen()
|
||||
|
||||
def _send_msg(self, bus, addr, length):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = addr << 21
|
||||
to_send[0].RDTR = length
|
||||
to_send[0].RDTR = bus << 4
|
||||
return to_send
|
||||
|
||||
def _set_prev_torque(self, t):
|
||||
self.safety.set_volkswagen_desired_torque_last(t)
|
||||
self.safety.set_volkswagen_rt_torque_last(t)
|
||||
|
||||
def _torque_driver_msg(self, torque):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 0x9F << 21
|
||||
|
||||
to_send = make_msg(0, 0x9F)
|
||||
t = abs(torque)
|
||||
to_send[0].RDHR = ((t & 0x1FFF) << 8)
|
||||
if torque < 0:
|
||||
@@ -49,9 +43,7 @@ class TestVolkswagenSafety(unittest.TestCase):
|
||||
return to_send
|
||||
|
||||
def _torque_msg(self, torque):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 0x126 << 21
|
||||
|
||||
to_send = make_msg(0, 0x126)
|
||||
t = abs(torque)
|
||||
to_send[0].RDLR = (t & 0xFFF) << 16
|
||||
if torque < 0:
|
||||
@@ -59,20 +51,21 @@ class TestVolkswagenSafety(unittest.TestCase):
|
||||
return to_send
|
||||
|
||||
def _gas_msg(self, gas):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 0x121 << 21
|
||||
to_send = make_msg(0, 0x121)
|
||||
to_send[0].RDLR = (gas & 0xFF) << 12
|
||||
|
||||
return to_send
|
||||
|
||||
def _button_msg(self, bit):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_send[0].RIR = 0x12B << 21
|
||||
to_send = make_msg(2, 0x12B)
|
||||
to_send[0].RDLR = 1 << bit
|
||||
to_send[0].RDTR = 2 << 4
|
||||
|
||||
return to_send
|
||||
|
||||
def test_spam_can_buses(self):
|
||||
test_spam_can_buses(self, TX_MSGS)
|
||||
|
||||
def test_relay_malfunction(self):
|
||||
test_relay_malfunction(self, 0x126)
|
||||
|
||||
def test_prev_gas(self):
|
||||
for g in range(0, 256):
|
||||
self.safety.safety_rx_hook(self._gas_msg(g))
|
||||
@@ -82,18 +75,13 @@ class TestVolkswagenSafety(unittest.TestCase):
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
|
||||
def test_enable_control_allowed_from_cruise(self):
|
||||
to_push = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_push[0].RIR = 0x122 << 21
|
||||
to_push = make_msg(0, 0x122)
|
||||
to_push[0].RDHR = 0x30000000
|
||||
|
||||
self.safety.safety_rx_hook(to_push)
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
|
||||
def test_disable_control_allowed_from_cruise(self):
|
||||
to_push = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
to_push[0].RIR = 0x122 << 21
|
||||
to_push[0].RDHR = 0
|
||||
|
||||
to_push = make_msg(0, 0x122)
|
||||
self.safety.set_controls_allowed(1)
|
||||
self.safety.safety_rx_hook(to_push)
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
@@ -130,10 +118,7 @@ class TestVolkswagenSafety(unittest.TestCase):
|
||||
self.assertTrue(self.safety.safety_tx_hook(self._torque_msg(t)))
|
||||
|
||||
def test_manually_enable_controls_allowed(self):
|
||||
self.safety.set_controls_allowed(1)
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
self.safety.set_controls_allowed(0)
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
test_manually_enable_controls_allowed(self)
|
||||
|
||||
def test_spam_cancel_safety_check(self):
|
||||
BIT_CANCEL = 13
|
||||
@@ -226,10 +211,10 @@ class TestVolkswagenSafety(unittest.TestCase):
|
||||
|
||||
|
||||
def test_fwd_hook(self):
|
||||
buss = list(range(0x0, 0x2))
|
||||
buss = list(range(0x0, 0x3))
|
||||
msgs = list(range(0x1, 0x800))
|
||||
blocked_msgs_0to2 = []
|
||||
blocked_msgs_2to0 = [0x122, 0x397]
|
||||
blocked_msgs_2to0 = [0x126, 0x397]
|
||||
for b in buss:
|
||||
for m in msgs:
|
||||
if b == 0:
|
||||
@@ -240,7 +225,7 @@ class TestVolkswagenSafety(unittest.TestCase):
|
||||
fwd_bus = -1 if m in blocked_msgs_2to0 else 0
|
||||
|
||||
# assume len 8
|
||||
self.assertEqual(fwd_bus, self.safety.safety_fwd_hook(b, self._send_msg(b, m, 8)))
|
||||
self.assertEqual(fwd_bus, self.safety.safety_fwd_hook(b, make_msg(b, m, 8)))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -59,11 +59,13 @@ def set_desired_torque_last(safety, mode, torque):
|
||||
safety.set_subaru_desired_torque_last(torque)
|
||||
|
||||
def package_can_msg(msg):
|
||||
addr_shift = 3 if msg.address >= 0x800 else 21
|
||||
rdlr, rdhr = struct.unpack('II', msg.dat.ljust(8, b'\x00'))
|
||||
|
||||
ret = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
ret[0].RIR = msg.address << addr_shift
|
||||
if msg.address >= 0x800:
|
||||
ret[0].RIR = (msg.address << 3) | 5
|
||||
else:
|
||||
ret[0].RIR = (msg.address << 21) | 1
|
||||
ret[0].RDTR = len(msg.dat) | ((msg.src & 0xF) << 4)
|
||||
ret[0].RDHR = rdhr
|
||||
ret[0].RDLR = rdlr
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
import os
|
||||
import sys
|
||||
import panda.tests.safety.libpandasafety_py as libpandasafety_py
|
||||
from panda.tests.safety import libpandasafety_py
|
||||
from panda.tests.safety_replay.helpers import package_can_msg, init_segment
|
||||
from tools.lib.logreader import LogReader # pylint: disable=import-error
|
||||
|
||||
@@ -10,7 +10,7 @@ from tools.lib.logreader import LogReader # pylint: disable=import-error
|
||||
def replay_drive(lr, safety_mode, param):
|
||||
safety = libpandasafety_py.libpandasafety
|
||||
|
||||
err = safety.safety_set_mode(safety_mode, param)
|
||||
err = safety.set_safety_hooks(safety_mode, param)
|
||||
assert err == 0, "invalid safety mode: %d" % safety_mode
|
||||
|
||||
if "SEGMENT" in os.environ:
|
||||
@@ -35,7 +35,7 @@ def replay_drive(lr, safety_mode, param):
|
||||
blocked_addrs.add(canmsg.address)
|
||||
|
||||
if "DEBUG" in os.environ:
|
||||
print("blocked %d at %f" % (canmsg.address, (msg.logMonoTime - start_t)/(1e9)))
|
||||
print("blocked bus %d msg %d at %f" % (canmsg.src, canmsg.address, (msg.logMonoTime - start_t)/(1e9)))
|
||||
tx_controls += safety.get_controls_allowed()
|
||||
tx_tot += 1
|
||||
elif msg.which() == 'can':
|
||||
|
||||
@@ -16,8 +16,9 @@ logs = [
|
||||
("f89c604cf653e2bf|2018-09-29--13-46-50.bz2", Panda.SAFETY_GM, 0), # GM.VOLT
|
||||
("0375fdf7b1ce594d|2019-05-21--20-10-33.bz2", Panda.SAFETY_HONDA_BOSCH, 1), # HONDA.ACCORD
|
||||
("02ec6bea180a4d36|2019-04-17--11-21-35.bz2", Panda.SAFETY_HYUNDAI, 0), # HYUNDAI.SANTA_FE
|
||||
("03efb1fda29e30fe|2019-02-21--18-03-45.bz2", Panda.SAFETY_CHRYSLER, 0), # CHRYSLER.PACIFICA_2018_HYBRID
|
||||
("6fb4948a7ebe670e|2019-11-12--00-35-53.bz2", Panda.SAFETY_CHRYSLER, 0), # CHRYSLER.PACIFICA_2018_HYBRID
|
||||
("791340bc01ed993d|2019-04-08--10-26-00.bz2", Panda.SAFETY_SUBARU, 0), # SUBARU.IMPREZA
|
||||
("b0c9d2329ad1606b|2019-11-17--17-06-13.bz2", Panda.SAFETY_VOLKSWAGEN, 0), # VOLKSWAGEN.GOLF
|
||||
]
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user