pigeond tests (#25630)

* start pigeond tests

* sm checks

* add some types

* little more

Co-authored-by: Comma Device <device@comma.ai>
old-commit-hash: 4bb399ba3c
This commit is contained in:
Adeeb Shihadeh 2022-08-31 23:12:26 -07:00 committed by GitHub
parent caac56b92e
commit 3308e740ea
8 changed files with 107 additions and 31 deletions

3
Jenkinsfile vendored
View File

@ -131,7 +131,8 @@ pipeline {
["test boardd loopback", "python selfdrive/boardd/tests/test_boardd_loopback.py"],
["test loggerd", "python selfdrive/loggerd/tests/test_loggerd.py"],
["test encoder", "LD_LIBRARY_PATH=/usr/local/lib python selfdrive/loggerd/tests/test_encoder.py"],
["test sensord", "python selfdrive/sensord/test/test_sensord.py"],
["test sensord", "python selfdrive/sensord/tests/test_sensord.py"],
["test pigeond", "python selfdrive/sensord/tests/test_pigeond.py"],
])
}
}

View File

@ -1,3 +1,5 @@
from typing import Optional
def gpio_init(pin: int, output: bool) -> None:
try:
with open(f"/sys/class/gpio/gpio{pin}/direction", 'wb') as f:
@ -5,10 +7,19 @@ def gpio_init(pin: int, output: bool) -> None:
except Exception as e:
print(f"Failed to set gpio {pin} direction: {e}")
def gpio_set(pin: int, high: bool) -> None:
try:
with open(f"/sys/class/gpio/gpio{pin}/value", 'wb') as f:
f.write(b"1" if high else b"0")
except Exception as e:
print(f"Failed to set gpio {pin} value: {e}")
def gpio_read(pin: int) -> Optional[bool]:
val = None
try:
with open(f"/sys/class/gpio/gpio{pin}/value", 'rb') as f:
val = bool(int(f.read().strip()))
except Exception as e:
print(f"Failed to set gpio {pin} value: {e}")
return val

View File

@ -1,5 +1,4 @@
#!/usr/bin/env python3
import sys
import time
import signal
@ -8,6 +7,7 @@ import struct
import requests
import urllib.parse
from datetime import datetime
from typing import List, Optional
from cereal import messaging
from common.params import Params
@ -25,7 +25,7 @@ UBLOX_SOS_NACK = b"\xb5\x62\x09\x14\x08\x00\x02\x00\x00\x00\x00\x00\x00\x00"
UBLOX_BACKUP_RESTORE_MSG = b"\xb5\x62\x09\x14\x08\x00\x03"
UBLOX_ASSIST_ACK = b"\xb5\x62\x13\x60\x08\x00"
def set_power(enabled):
def set_power(enabled: bool) -> None:
gpio_init(GPIO.UBLOX_SAFEBOOT_N, True)
gpio_init(GPIO.UBLOX_PWR_EN, True)
gpio_init(GPIO.UBLOX_RST_N, True)
@ -35,14 +35,14 @@ def set_power(enabled):
gpio_set(GPIO.UBLOX_RST_N, enabled)
def add_ubx_checksum(msg):
def add_ubx_checksum(msg: bytes) -> bytes:
A = B = 0
for b in msg[2:]:
A = (A + b) % 256
B = (B + A) % 256
return msg + bytes([A, B])
def get_assistnow_messages(token):
def get_assistnow_messages(token: bytes) -> List[bytes]:
# make request
# TODO: implement adding the last known location
r = requests.get("https://online-live2.services.u-blox.com/GetOnlineData.ashx", params=urllib.parse.urlencode({
@ -64,14 +64,13 @@ def get_assistnow_messages(token):
class TTYPigeon():
def __init__(self, path):
self.path = path
def __init__(self):
self.tty = serial.VTIMESerial(UBLOX_TTY, baudrate=9600, timeout=0)
def send(self, dat):
def send(self, dat: bytes) -> None:
self.tty.write(dat)
def receive(self):
def receive(self) -> bytes:
dat = b''
while len(dat) < 0x1000:
d = self.tty.read(0x40)
@ -80,10 +79,10 @@ class TTYPigeon():
break
return dat
def set_baud(self, baud):
def set_baud(self, baud: int) -> None:
self.tty.baudrate = baud
def wait_for_ack(self, ack=UBLOX_ACK, nack=UBLOX_NACK, timeout=0.5):
def wait_for_ack(self, ack: bytes = UBLOX_ACK, nack: bytes = UBLOX_NACK, timeout: float = 0.5) -> bool:
dat = b''
st = time.monotonic()
while True:
@ -99,11 +98,11 @@ class TTYPigeon():
raise TimeoutError('No response from ublox')
time.sleep(0.001)
def send_with_ack(self, dat, ack=UBLOX_ACK, nack=UBLOX_NACK):
def send_with_ack(self, dat: bytes, ack: bytes = UBLOX_ACK, nack: bytes = UBLOX_NACK) -> None:
self.send(dat)
self.wait_for_ack(ack, nack)
def wait_for_backup_restore_status(self, timeout=1):
def wait_for_backup_restore_status(self, timeout: float = 1.) -> int:
dat = b''
st = time.monotonic()
while True:
@ -117,7 +116,7 @@ class TTYPigeon():
time.sleep(0.001)
def initialize_pigeon(pigeon):
def initialize_pigeon(pigeon: TTYPigeon) -> None:
# try initializing a few times
for _ in range(10):
try:
@ -200,21 +199,22 @@ def initialize_pigeon(pigeon):
except TimeoutError:
cloudlog.warning("Initialization failed, trying again!")
def deinitialize_and_exit(pigeon):
def deinitialize_and_exit(pigeon: Optional[TTYPigeon]):
cloudlog.warning("Storing almanac in ublox flash")
# controlled GNSS stop
pigeon.send(b"\xB5\x62\x06\x04\x04\x00\x00\x00\x08\x00\x16\x74")
if pigeon is not None:
# controlled GNSS stop
pigeon.send(b"\xB5\x62\x06\x04\x04\x00\x00\x00\x08\x00\x16\x74")
# store almanac in flash
pigeon.send(b"\xB5\x62\x09\x14\x04\x00\x00\x00\x00\x00\x21\xEC")
try:
if pigeon.wait_for_ack(ack=UBLOX_SOS_ACK, nack=UBLOX_SOS_NACK):
cloudlog.warning("Done storing almanac")
else:
cloudlog.error("Error storing almanac")
except TimeoutError:
pass
# store almanac in flash
pigeon.send(b"\xB5\x62\x09\x14\x04\x00\x00\x00\x00\x00\x21\xEC")
try:
if pigeon.wait_for_ack(ack=UBLOX_SOS_ACK, nack=UBLOX_SOS_NACK):
cloudlog.warning("Done storing almanac")
else:
cloudlog.error("Error storing almanac")
except TimeoutError:
pass
# turn off power and exit cleanly
set_power(False)
@ -223,6 +223,10 @@ def deinitialize_and_exit(pigeon):
def main():
assert TICI, "unsupported hardware for pigeond"
# register exit handler
pigeon = None
signal.signal(signal.SIGINT, lambda sig, frame: deinitialize_and_exit(pigeon))
pm = messaging.PubMaster(['ubloxRaw'])
# power cycle ublox
@ -231,12 +235,9 @@ def main():
set_power(True)
time.sleep(0.5)
pigeon = TTYPigeon(UBLOX_TTY)
pigeon = TTYPigeon()
initialize_pigeon(pigeon)
# register exit handler
signal.signal(signal.SIGINT, lambda sig, frame: deinitialize_and_exit(pigeon))
# start receiving data
while True:
dat = pigeon.receive()

View File

View File

@ -0,0 +1,63 @@
#!/usr/bin/env python3
import time
import unittest
import cereal.messaging as messaging
from cereal.services import service_list
from common.gpio import gpio_read
from selfdrive.test.helpers import with_processes
from selfdrive.manager.process_config import managed_processes
from system.hardware import TICI
from system.hardware.tici.pins import GPIO
# TODO: test TTFF when we have good A-GNSS
class TestPigeond(unittest.TestCase):
@classmethod
def setUpClass(cls):
if not TICI:
raise unittest.SkipTest
def tearDown(self):
managed_processes['pigeond'].stop()
@with_processes(['pigeond'])
def test_frequency(self):
sm = messaging.SubMaster(['ubloxRaw'])
# setup time
time.sleep(2)
sm.update()
for _ in range(int(10 * service_list['ubloxRaw'].frequency)):
sm.update()
assert sm.all_checks()
def test_startup_time(self):
for _ in range(5):
sm = messaging.SubMaster(['ubloxRaw'])
managed_processes['pigeond'].start()
start_time = time.monotonic()
for __ in range(10):
sm.update(1 * 1000)
if sm.updated['ubloxRaw']:
break
assert sm.rcv_frame['ubloxRaw'] > 0, "pigeond didn't start outputting messages in time"
et = time.monotonic() - start_time
assert et < 5, f"pigeond took {et:.1f}s to start"
managed_processes['pigeond'].stop()
def test_turns_off_ublox(self):
for s in (0.1, 0.5, 1, 5):
managed_processes['pigeond'].start()
time.sleep(s)
managed_processes['pigeond'].stop()
assert gpio_read(GPIO.UBLOX_RST_N) == 0
assert gpio_read(GPIO.UBLOX_PWR_EN) == 0
if __name__ == "__main__":
unittest.main()