mirror of https://github.com/commaai/panda.git
Add pedal tests to CI (#680)
* Add pedal tests to CI * Organized as unittest * Linter and imports * Ignore 'none' serials * Force into DFU * Check in softloader mode * Pin jungle commit * Add silent mode to pedal * Move pedal serial to const * split CIs * full path to canhandle * Revert adding silent mode to pedal * improve * increase timeout pedal test
This commit is contained in:
parent
7d93e5a202
commit
ada4f7e177
|
@ -65,6 +65,9 @@ RUN pip install -r /tmp/requirements.txt
|
|||
|
||||
ENV PYTHONPATH /tmp:$PYTHONPATH
|
||||
|
||||
RUN cd /tmp && git clone https://github.com/commaai/panda_jungle.git
|
||||
RUN cd /tmp && git clone https://github.com/commaai/panda_jungle.git && \
|
||||
cd panda_jungle && \
|
||||
git fetch && \
|
||||
git checkout 7b7197c605915ac34f3d62f314edd84e2e78a759
|
||||
|
||||
ADD ./panda.tar.gz /tmp/panda
|
||||
|
|
|
@ -14,6 +14,22 @@ pipeline {
|
|||
}
|
||||
}
|
||||
}
|
||||
stage('PEDAL tests') {
|
||||
steps {
|
||||
lock(resource: "pedal", inversePrecedence: true, quantity: 1) {
|
||||
timeout(time: 10, unit: 'MINUTES') {
|
||||
script {
|
||||
sh "docker run --rm --privileged \
|
||||
--volume /dev/bus/usb:/dev/bus/usb \
|
||||
--volume /var/run/dbus:/var/run/dbus \
|
||||
--net host \
|
||||
${env.DOCKER_IMAGE_TAG} \
|
||||
bash -c 'cd /tmp/panda && PEDAL_JUNGLE=23002d000851393038373731 python ./tests/pedal/test_pedal.py'"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
stage('HITL tests') {
|
||||
steps {
|
||||
lock(resource: "pandas", inversePrecedence: true, quantity: 1) {
|
||||
|
@ -24,7 +40,7 @@ pipeline {
|
|||
--volume /var/run/dbus:/var/run/dbus \
|
||||
--net host \
|
||||
${env.DOCKER_IMAGE_TAG} \
|
||||
bash -c 'cd /tmp/panda && scons && ./tests/automated/test.sh'"
|
||||
bash -c 'cd /tmp/panda && scons && PANDAS_JUNGLE=058010800f51363038363036 ./tests/automated/test.sh'"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,8 @@ if os.getenv("PEDAL"):
|
|||
"-O2",
|
||||
"-DPEDAL",
|
||||
]
|
||||
if os.getenv("PEDAL_USB"):
|
||||
PROJECT_FLAGS.append("-DPEDAL_USB")
|
||||
|
||||
else:
|
||||
PROJECT = "panda"
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import os
|
||||
import time
|
||||
import random
|
||||
import _thread
|
||||
|
@ -15,12 +16,14 @@ BUS_SPEEDS = [(0, SPEED_NORMAL), (1, SPEED_NORMAL), (2, SPEED_NORMAL), (3, SPEED
|
|||
TIMEOUT = 45
|
||||
GEN2_HW_TYPES = [Panda.HW_TYPE_BLACK_PANDA, Panda.HW_TYPE_UNO]
|
||||
GPS_HW_TYPES = [Panda.HW_TYPE_GREY_PANDA, Panda.HW_TYPE_BLACK_PANDA, Panda.HW_TYPE_UNO]
|
||||
PEDAL_SERIAL = 'none'
|
||||
JUNGLE_SERIAL = os.getenv("PANDAS_JUNGLE")
|
||||
|
||||
# Enable fault debug
|
||||
faulthandler.enable(all_threads=False)
|
||||
|
||||
# Connect to Panda Jungle
|
||||
panda_jungle = PandaJungle()
|
||||
panda_jungle = PandaJungle(JUNGLE_SERIAL)
|
||||
|
||||
# Find all panda's connected
|
||||
_panda_serials = None
|
||||
|
@ -44,7 +47,7 @@ test_all_types = parameterized([
|
|||
param(panda_type=Panda.HW_TYPE_UNO)
|
||||
])
|
||||
test_all_pandas = parameterized(
|
||||
list(map(lambda x: x[0], _panda_serials)) # type: ignore
|
||||
list(map(lambda x: x[0], filter(lambda x: x[0] != PEDAL_SERIAL, _panda_serials))) # type: ignore
|
||||
)
|
||||
test_all_gen2_pandas = parameterized(
|
||||
list(map(lambda x: x[0], filter(lambda x: x[1] in GEN2_HW_TYPES, _panda_serials))) # type: ignore
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
import struct
|
||||
import signal
|
||||
|
||||
|
||||
class CanHandle(object):
|
||||
def __init__(self, p, bus):
|
||||
self.p = p
|
||||
self.bus = bus
|
||||
|
||||
def transact(self, dat):
|
||||
self.p.isotp_send(1, dat, self.bus, recvaddr=2)
|
||||
|
||||
def _handle_timeout(signum, frame):
|
||||
# will happen on reset
|
||||
raise Exception("timeout")
|
||||
|
||||
signal.signal(signal.SIGALRM, _handle_timeout)
|
||||
signal.alarm(1)
|
||||
try:
|
||||
ret = self.p.isotp_recv(2, self.bus, sendaddr=1)
|
||||
finally:
|
||||
signal.alarm(0)
|
||||
|
||||
return ret
|
||||
|
||||
def controlWrite(self, request_type, request, value, index, data, timeout=0):
|
||||
# ignore data in reply, panda doesn't use it
|
||||
return self.controlRead(request_type, request, value, index, 0, timeout)
|
||||
|
||||
def controlRead(self, request_type, request, value, index, length, timeout=0):
|
||||
dat = struct.pack("HHBBHHH", 0, 0, request_type, request, value, index, length)
|
||||
return self.transact(dat)
|
||||
|
||||
def bulkWrite(self, endpoint, data, timeout=0):
|
||||
if len(data) > 0x10:
|
||||
raise ValueError("Data must not be longer than 0x10")
|
||||
dat = struct.pack("HH", endpoint, len(data)) + data
|
||||
return self.transact(dat)
|
||||
|
||||
def bulkRead(self, endpoint, length, timeout=0):
|
||||
dat = struct.pack("HH", endpoint, 0)
|
||||
return self.transact(dat)
|
|
@ -1,47 +1,9 @@
|
|||
#!/usr/bin/env python3
|
||||
import time
|
||||
import struct
|
||||
import argparse
|
||||
import signal
|
||||
from panda import Panda
|
||||
from panda.tests.pedal.canhandle import CanHandle
|
||||
|
||||
class CanHandle(object):
|
||||
def __init__(self, p):
|
||||
self.p = p
|
||||
|
||||
def transact(self, dat):
|
||||
self.p.isotp_send(1, dat, 0, recvaddr=2)
|
||||
|
||||
def _handle_timeout(signum, frame):
|
||||
# will happen on reset
|
||||
raise Exception("timeout")
|
||||
|
||||
signal.signal(signal.SIGALRM, _handle_timeout)
|
||||
signal.alarm(1)
|
||||
try:
|
||||
ret = self.p.isotp_recv(2, 0, sendaddr=1)
|
||||
finally:
|
||||
signal.alarm(0)
|
||||
|
||||
return ret
|
||||
|
||||
def controlWrite(self, request_type, request, value, index, data, timeout=0):
|
||||
# ignore data in reply, panda doesn't use it
|
||||
return self.controlRead(request_type, request, value, index, 0, timeout)
|
||||
|
||||
def controlRead(self, request_type, request, value, index, length, timeout=0):
|
||||
dat = struct.pack("HHBBHHH", 0, 0, request_type, request, value, index, length)
|
||||
return self.transact(dat)
|
||||
|
||||
def bulkWrite(self, endpoint, data, timeout=0):
|
||||
if len(data) > 0x10:
|
||||
raise ValueError("Data must not be longer than 0x10")
|
||||
dat = struct.pack("HH", endpoint, len(data)) + data
|
||||
return self.transact(dat)
|
||||
|
||||
def bulkRead(self, endpoint, length, timeout=0):
|
||||
dat = struct.pack("HH", endpoint, 0)
|
||||
return self.transact(dat)
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description='Flash pedal over can')
|
||||
|
@ -66,6 +28,6 @@ if __name__ == "__main__":
|
|||
time.sleep(0.1)
|
||||
print("flashing", args.fn)
|
||||
code = open(args.fn, "rb").read()
|
||||
Panda.flash_static(CanHandle(p), code)
|
||||
Panda.flash_static(CanHandle(p, 0), code)
|
||||
|
||||
print("can flash done")
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
import os
|
||||
import time
|
||||
import subprocess
|
||||
import unittest
|
||||
from panda import Panda, BASEDIR
|
||||
from panda_jungle import PandaJungle # pylint: disable=import-error
|
||||
from panda.tests.pedal.canhandle import CanHandle
|
||||
|
||||
|
||||
JUNGLE_SERIAL = os.getenv("PEDAL_JUNGLE")
|
||||
PEDAL_SERIAL = 'none'
|
||||
PEDAL_BUS = 1
|
||||
|
||||
class TestPedal(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.jungle = PandaJungle(JUNGLE_SERIAL)
|
||||
self.jungle.set_panda_power(True)
|
||||
self.jungle.set_ignition(False)
|
||||
|
||||
def tearDown(self):
|
||||
self.jungle.close()
|
||||
|
||||
def _flash_over_can(self, bus, fw_file):
|
||||
print(f"Flashing {fw_file}")
|
||||
while len(self.jungle.can_recv()) != 0:
|
||||
continue
|
||||
self.jungle.can_send(0x200, b"\xce\xfa\xad\xde\x1e\x0b\xb0\x0a", bus)
|
||||
|
||||
time.sleep(0.1)
|
||||
with open(fw_file, "rb") as code:
|
||||
PandaJungle.flash_static(CanHandle(self.jungle, bus), code.read())
|
||||
|
||||
def _listen_can_frames(self):
|
||||
self.jungle.can_clear(0xFFFF)
|
||||
msgs = 0
|
||||
for _ in range(10):
|
||||
incoming = self.jungle.can_recv()
|
||||
for message in incoming:
|
||||
address, _, _, bus = message
|
||||
if address == 0x201 and bus == PEDAL_BUS:
|
||||
msgs += 1
|
||||
time.sleep(0.1)
|
||||
return msgs
|
||||
|
||||
def test_usb_fw(self):
|
||||
subprocess.check_output(f"cd {BASEDIR} && PEDAL=1 PEDAL_USB=1 scons", shell=True)
|
||||
self._flash_over_can(PEDAL_BUS, f"{BASEDIR}board/obj/pedal.bin.signed")
|
||||
time.sleep(2)
|
||||
p = Panda(PEDAL_SERIAL)
|
||||
self.assertTrue(p.is_pedal())
|
||||
p.close()
|
||||
self.assertTrue(self._listen_can_frames() > 40)
|
||||
|
||||
def test_nonusb_fw(self):
|
||||
subprocess.check_output(f"cd {BASEDIR} && PEDAL=1 scons", shell=True)
|
||||
self._flash_over_can(PEDAL_BUS, f"{BASEDIR}board/obj/pedal.bin.signed")
|
||||
time.sleep(2)
|
||||
self.assertTrue(PEDAL_SERIAL not in Panda.list())
|
||||
self.assertTrue(self._listen_can_frames() > 40)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
Reference in New Issue