SPI: connect by serial/UID (#1213)

* SPI: connect by serial/UID

* cleanup

* not everyone can have spi

* fix that

* move that

* mypy fix

Co-authored-by: Comma Device <device@comma.ai>
This commit is contained in:
Adeeb Shihadeh 2023-01-15 23:30:24 -08:00 committed by GitHub
parent b8693da342
commit 3048760737
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 42 additions and 29 deletions

View File

@ -7,7 +7,6 @@ import struct
import hashlib
import binascii
import datetime
import traceback
import warnings
import logging
from functools import wraps
@ -17,7 +16,7 @@ from itertools import accumulate
from .config import DEFAULT_FW_FN, DEFAULT_H7_FW_FN, SECTOR_SIZES_FX, SECTOR_SIZES_H7
from .dfu import PandaDFU, MCU_TYPE_F2, MCU_TYPE_F4, MCU_TYPE_H7
from .isotp import isotp_send, isotp_recv
from .spi import SpiHandle
from .spi import SpiHandle, PandaSpiException
__version__ = '0.0.10'
@ -227,7 +226,7 @@ class Panda:
FLAG_GM_HW_CAM = 1
FLAG_GM_HW_CAM_LONG = 2
def __init__(self, serial: Optional[str] = None, claim: bool = True, spi: bool = False, disable_checks: bool = True):
def __init__(self, serial: Optional[str] = None, claim: bool = True, disable_checks: bool = True):
self._serial = serial
self._disable_checks = disable_checks
@ -237,7 +236,6 @@ class Panda:
self.can_rx_overflow_buffer = b''
# connect and set mcu type
self._spi = spi
self.connect(claim)
# reset comms
@ -258,15 +256,10 @@ class Panda:
self.close()
self._handle = None
if self._spi:
self._handle = SpiHandle()
# TODO implement
self._serial = "SPIDEV"
self.bootstub = False
else:
self.usb_connect(claim=claim, wait=wait)
# try USB first, then SPI
self.usb_connect(claim=claim, wait=wait)
if self._handle is None:
self.spi_connect()
assert self._handle is not None
self._mcu_type = self.get_mcu_type()
@ -278,6 +271,23 @@ class Panda:
self.set_heartbeat_disabled()
self.set_power_save(0)
def spi_connect(self):
# get UID to confirm slave is present and up
spi_serial = None
try:
self._handle = SpiHandle()
spi_serial = self.get_uid()
except PandaSpiException:
pass
if spi_serial is not None and ((self._serial is None) or (self._serial == spi_serial)):
self._serial = spi_serial
# TODO: detect this
self.bootstub = False
else:
# failed to connect
self._handle = None
def usb_connect(self, claim=True, wait=False):
context = usb1.USBContext()
while 1:
@ -288,9 +298,10 @@ class Panda:
this_serial = device.getSerialNumber()
except Exception:
continue
if self._serial is None or this_serial == self._serial:
self._serial = this_serial
print("opening device", self._serial, hex(device.getProductID()))
logging.debug("opening device %s %s", this_serial, hex(device.getProductID()))
self.bootstub = device.getProductID() == 0xddee
self._handle = device.open()
if sys.platform not in ("win32", "cygwin", "msys", "darwin"):
@ -305,9 +316,8 @@ class Panda:
self._bcd_device = bytearray([bcd >> 8, ])
break
except Exception as e:
print("exception", e)
traceback.print_exc()
except Exception:
logging.exception("USB connect error")
if not wait or self._handle is not None:
break
context = usb1.USBContext() # New context needed so new devices show up
@ -349,8 +359,6 @@ class Panda:
if not success:
raise Exception("reconnect failed")
@staticmethod
def flash_static(handle, code, mcu_type):
assert mcu_type is not None, "must set valid mcu_type to flash"

View File

@ -1,3 +1,4 @@
import os
import fcntl
import math
import time
@ -24,10 +25,15 @@ MAX_XFER_RETRY_COUNT = 5
USB_MAX_SIZE = 0x40
DEV_PATH = "/dev/spidev0.0"
class PandaSpiException(Exception):
pass
class PandaSpiUnavailable(PandaSpiException):
pass
class PandaSpiNackResponse(PandaSpiException):
pass
@ -52,8 +58,10 @@ def flocked(fd):
# This mimics the handle given by libusb1 for easy interoperability
class SpiHandle:
def __init__(self):
if not os.path.exists(DEV_PATH):
raise PandaSpiUnavailable(f"SPI device not found: {DEV_PATH}")
if spidev is None:
raise RuntimeError("spidev is not available")
raise PandaSpiUnavailable("spidev is not installed")
self.spi = spidev.SpiDev() # pylint: disable=c-extension-no-member
self.spi.open(0, 0)
@ -82,6 +90,7 @@ class SpiHandle:
logging.debug("starting transfer: endpoint=%d, max_rx_len=%d", endpoint, max_rx_len)
logging.debug("==============================================")
exc = PandaSpiException()
for n in range(MAX_XFER_RETRY_COUNT):
logging.debug("\ntry #%d", n+1)
try:
@ -111,9 +120,10 @@ class SpiHandle:
raise PandaSpiBadChecksum
return dat[:-1]
except PandaSpiException:
logging.exception("SPI transfer failed, %d retries left", n)
raise PandaSpiTransferFailed(f"SPI transaction failed {MAX_XFER_RETRY_COUNT} times")
except PandaSpiException as e:
exc = e
logging.debug("SPI transfer failed, %d retries left", n, exc_info=True)
raise exc
# libusb1 functions
def close(self):

View File

@ -1,17 +1,12 @@
#!/usr/bin/env python3
import os
import time
from panda import Panda
if __name__ == "__main__":
spi = "SPI" in os.environ
print("using SPI" if spi else "using USB")
i = 0
pi = 0
panda = Panda(spi=spi)
panda = Panda()
while True:
st = time.monotonic()
while time.monotonic() - st < 1: