tizi: retry amp comms (#27735)
* tizi: retry amp comms * ensure all config is written together * simple test * check errors * test shutdown * a tici exclusive --------- Co-authored-by: Comma Device <device@comma.ai> old-commit-hash: 767ed4295f9924a98df5dcd79b8bd71d73511d73
This commit is contained in:
1
Jenkinsfile
vendored
1
Jenkinsfile
vendored
@@ -135,6 +135,7 @@ pipeline {
|
||||
["test sensord", "cd system/sensord/tests && python -m unittest test_sensord.py"],
|
||||
["test camerad", "python system/camerad/test/test_camerad.py"],
|
||||
["test exposure", "python system/camerad/test/test_exposure.py"],
|
||||
["test amp", "python system/hardware/tici/tests/test_amplifier.py"],
|
||||
])
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import time
|
||||
from smbus2 import SMBus
|
||||
from collections import namedtuple
|
||||
from typing import List
|
||||
|
||||
# https://datasheets.maximintegrated.com/en/ds/MAX98089.pdf
|
||||
|
||||
@@ -104,31 +106,44 @@ class Amplifier:
|
||||
def __init__(self, debug=False):
|
||||
self.debug = debug
|
||||
|
||||
def set_config(self, config):
|
||||
def _get_shutdown_config(self, amp_disabled: bool) -> AmpConfig:
|
||||
return AmpConfig("Global shutdown", 0b0 if amp_disabled else 0b1, 0x51, 7, 0b10000000)
|
||||
|
||||
def _set_configs(self, configs: List[AmpConfig]) -> None:
|
||||
with SMBus(self.AMP_I2C_BUS) as bus:
|
||||
if self.debug:
|
||||
print(f"Setting \"{config.name}\" to {config.value}:")
|
||||
for config in configs:
|
||||
if self.debug:
|
||||
print(f"Setting \"{config.name}\" to {config.value}:")
|
||||
|
||||
old_value = bus.read_byte_data(self.AMP_ADDRESS, config.register, force=True)
|
||||
new_value = (old_value & (~config.mask)) | ((config.value << config.offset) & config.mask)
|
||||
bus.write_byte_data(self.AMP_ADDRESS, config.register, new_value, force=True)
|
||||
old_value = bus.read_byte_data(self.AMP_ADDRESS, config.register, force=True)
|
||||
new_value = (old_value & (~config.mask)) | ((config.value << config.offset) & config.mask)
|
||||
bus.write_byte_data(self.AMP_ADDRESS, config.register, new_value, force=True)
|
||||
|
||||
if self.debug:
|
||||
print(f" Changed {hex(config.register)}: {hex(old_value)} -> {hex(new_value)}")
|
||||
if self.debug:
|
||||
print(f" Changed {hex(config.register)}: {hex(old_value)} -> {hex(new_value)}")
|
||||
|
||||
def set_global_shutdown(self, amp_disabled):
|
||||
self.set_config(AmpConfig("Global shutdown", 0b0 if amp_disabled else 0b1, 0x51, 7, 0b10000000))
|
||||
def set_configs(self, configs: List[AmpConfig]) -> bool:
|
||||
# retry in case panda is using the amp
|
||||
for _ in range(10):
|
||||
try:
|
||||
self._set_configs(configs)
|
||||
return True
|
||||
except OSError:
|
||||
print("Failed to set amp config, retrying...")
|
||||
time.sleep(0.02)
|
||||
return False
|
||||
|
||||
def initialize_configuration(self, model):
|
||||
self.set_global_shutdown(amp_disabled=True)
|
||||
def set_global_shutdown(self, amp_disabled: bool) -> bool:
|
||||
return self.set_configs([self._get_shutdown_config(amp_disabled), ])
|
||||
|
||||
for config in BASE_CONFIG:
|
||||
self.set_config(config)
|
||||
|
||||
for config in CONFIGS[model]:
|
||||
self.set_config(config)
|
||||
|
||||
self.set_global_shutdown(amp_disabled=False)
|
||||
def initialize_configuration(self, model: str) -> bool:
|
||||
cfgs = [
|
||||
self._get_shutdown_config(True),
|
||||
*BASE_CONFIG,
|
||||
*CONFIGS[model],
|
||||
self._get_shutdown_config(False),
|
||||
]
|
||||
return self.set_configs(cfgs)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
70
system/hardware/tici/tests/test_amplifier.py
Executable file
70
system/hardware/tici/tests/test_amplifier.py
Executable file
@@ -0,0 +1,70 @@
|
||||
#!/usr/bin/env python3
|
||||
import time
|
||||
import random
|
||||
import unittest
|
||||
import subprocess
|
||||
|
||||
from panda import Panda
|
||||
from system.hardware import TICI
|
||||
from system.hardware.tici.hardware import Tici
|
||||
from system.hardware.tici.amplifier import Amplifier
|
||||
|
||||
|
||||
class TestAmplifier(unittest.TestCase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
if not TICI:
|
||||
raise unittest.SkipTest
|
||||
|
||||
def setUp(self):
|
||||
# clear dmesg
|
||||
subprocess.check_call("sudo dmesg -C", shell=True)
|
||||
|
||||
self.panda = Panda()
|
||||
self.panda.reset()
|
||||
|
||||
def tearDown(self):
|
||||
self.panda.reset(reconnect=False)
|
||||
|
||||
def _check_for_i2c_errors(self, expected):
|
||||
dmesg = subprocess.check_output("dmesg", shell=True, encoding='utf8')
|
||||
i2c_lines = [l for l in dmesg.strip().splitlines() if 'i2c_geni a88000.i2c' in l]
|
||||
i2c_str = '\n'.join(i2c_lines)
|
||||
if not expected:
|
||||
assert len(i2c_lines) == 0
|
||||
else:
|
||||
assert "i2c error :-107" in i2c_str or "Bus arbitration lost" in i2c_str
|
||||
|
||||
def test_init(self):
|
||||
amp = Amplifier(debug=True)
|
||||
r = amp.initialize_configuration(Tici().model)
|
||||
assert r
|
||||
self._check_for_i2c_errors(False)
|
||||
|
||||
def test_shutdown(self):
|
||||
amp = Amplifier(debug=True)
|
||||
for _ in range(10):
|
||||
r = amp.set_global_shutdown(True)
|
||||
r = amp.set_global_shutdown(False)
|
||||
assert r
|
||||
self._check_for_i2c_errors(False)
|
||||
|
||||
def test_init_while_siren_play(self):
|
||||
for _ in range(5):
|
||||
self.panda.set_siren(False)
|
||||
time.sleep(0.1)
|
||||
|
||||
self.panda.set_siren(True)
|
||||
time.sleep(random.randint(0, 5))
|
||||
|
||||
amp = Amplifier(debug=True)
|
||||
r = amp.initialize_configuration(Tici().model)
|
||||
assert r
|
||||
|
||||
# make sure we're a good test
|
||||
self._check_for_i2c_errors(True)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user