mirror of
https://github.com/infiniteCable2/panda.git
synced 2026-02-18 17:23:52 +08:00
python: show SPI pandas in list (#1216)
* refactor connect * spi list * add back bcd * couple fixes * fix Co-authored-by: Comma Device <device@comma.ai>
This commit is contained in:
@@ -255,11 +255,14 @@ class Panda:
|
||||
self._handle = None
|
||||
|
||||
# try USB first, then SPI
|
||||
self.usb_connect(claim=claim, wait=wait)
|
||||
self._handle, serial, self.bootstub, self._bcd_device = self.usb_connect(self._serial, claim=claim, wait=wait)
|
||||
if self._handle is None:
|
||||
self.spi_connect()
|
||||
self._handle, serial, self.bootstub, _ = self.spi_connect(self._serial)
|
||||
|
||||
assert self._handle is not None
|
||||
if self._handle is None:
|
||||
raise Exception("failed to connect to panda")
|
||||
|
||||
self._serial = serial
|
||||
self._mcu_type = self.get_mcu_type()
|
||||
self.health_version, self.can_version, self.can_health_version = self.get_packets_versions()
|
||||
logging.debug("connected")
|
||||
@@ -269,24 +272,29 @@ class Panda:
|
||||
self.set_heartbeat_disabled()
|
||||
self.set_power_save(0)
|
||||
|
||||
def spi_connect(self):
|
||||
@staticmethod
|
||||
def spi_connect(serial):
|
||||
# get UID to confirm slave is present and up
|
||||
handle = None
|
||||
spi_serial = None
|
||||
try:
|
||||
self._handle = SpiHandle()
|
||||
spi_serial = self.get_uid()
|
||||
handle = SpiHandle()
|
||||
dat = handle.controlRead(Panda.REQUEST_IN, 0xc3, 0, 0, 12)
|
||||
spi_serial = binascii.hexlify(dat).decode()
|
||||
except PandaSpiException:
|
||||
pass
|
||||
|
||||
if spi_serial is not None and ((self._serial is None) or (self._serial == spi_serial)):
|
||||
self._serial = spi_serial
|
||||
# TODO: detect this
|
||||
self.bootstub = False
|
||||
else:
|
||||
# failed to connect
|
||||
self._handle = None
|
||||
# no connection or wrong panda
|
||||
if spi_serial is None or (serial is not None and (spi_serial != serial)):
|
||||
handle = None
|
||||
spi_serial = None
|
||||
|
||||
def usb_connect(self, claim=True, wait=False):
|
||||
# TODO: detect bootstub
|
||||
return handle, spi_serial, False, None
|
||||
|
||||
@staticmethod
|
||||
def usb_connect(serial, claim=True, wait=False):
|
||||
handle, usb_serial, bootstub, bcd = None, None, None, None
|
||||
context = usb1.USBContext()
|
||||
while 1:
|
||||
try:
|
||||
@@ -297,29 +305,64 @@ class Panda:
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if self._serial is None or this_serial == self._serial:
|
||||
self._serial = this_serial
|
||||
if serial is None or this_serial == serial:
|
||||
logging.debug("opening device %s %s", this_serial, hex(device.getProductID()))
|
||||
self.bootstub = device.getProductID() == 0xddee
|
||||
self._handle = device.open()
|
||||
|
||||
usb_serial = this_serial
|
||||
bootstub = device.getProductID() == 0xddee
|
||||
handle = device.open()
|
||||
if sys.platform not in ("win32", "cygwin", "msys", "darwin"):
|
||||
self._handle.setAutoDetachKernelDriver(True)
|
||||
handle.setAutoDetachKernelDriver(True)
|
||||
if claim:
|
||||
self._handle.claimInterface(0)
|
||||
# self._handle.setInterfaceAltSetting(0, 0) # Issue in USB stack
|
||||
handle.claimInterface(0)
|
||||
# handle.setInterfaceAltSetting(0, 0) # Issue in USB stack
|
||||
|
||||
# bcdDevice wasn't always set to the hw type, ignore if it's the old constant
|
||||
bcd = device.getbcdDevice()
|
||||
if bcd is not None and bcd != 0x2300:
|
||||
self._bcd_device = bytearray([bcd >> 8, ])
|
||||
this_bcd = device.getbcdDevice()
|
||||
if this_bcd is not None and this_bcd != 0x2300:
|
||||
bcd = bytearray([this_bcd >> 8, ])
|
||||
|
||||
break
|
||||
except Exception:
|
||||
logging.exception("USB connect error")
|
||||
if not wait or self._handle is not None:
|
||||
if not wait or handle is not None:
|
||||
break
|
||||
context = usb1.USBContext() # New context needed so new devices show up
|
||||
|
||||
return handle, usb_serial, bootstub, bcd
|
||||
|
||||
@staticmethod
|
||||
def list():
|
||||
ret = Panda.usb_list()
|
||||
ret += Panda.spi_list()
|
||||
return list(set(ret))
|
||||
|
||||
@staticmethod
|
||||
def usb_list():
|
||||
context = usb1.USBContext()
|
||||
ret = []
|
||||
try:
|
||||
for device in context.getDeviceList(skip_on_error=True):
|
||||
if device.getVendorID() == 0xbbaa and device.getProductID() in (0xddcc, 0xddee):
|
||||
try:
|
||||
serial = device.getSerialNumber()
|
||||
if len(serial) == 24:
|
||||
ret.append(serial)
|
||||
else:
|
||||
warnings.warn(f"found device with panda descriptors but invalid serial: {serial}", RuntimeWarning)
|
||||
except Exception:
|
||||
continue
|
||||
except Exception:
|
||||
pass
|
||||
return ret
|
||||
|
||||
@staticmethod
|
||||
def spi_list():
|
||||
_, serial, _, _ = Panda.spi_connect(None)
|
||||
if serial is not None:
|
||||
return [serial, ]
|
||||
return []
|
||||
|
||||
def reset(self, enter_bootstub=False, enter_bootloader=False, reconnect=True):
|
||||
try:
|
||||
if enter_bootloader:
|
||||
@@ -444,25 +487,6 @@ class Panda:
|
||||
return False
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def list():
|
||||
context = usb1.USBContext()
|
||||
ret = []
|
||||
try:
|
||||
for device in context.getDeviceList(skip_on_error=True):
|
||||
if device.getVendorID() == 0xbbaa and device.getProductID() in (0xddcc, 0xddee):
|
||||
try:
|
||||
serial = device.getSerialNumber()
|
||||
if len(serial) == 24:
|
||||
ret.append(serial)
|
||||
else:
|
||||
warnings.warn(f"found device with panda descriptors but invalid serial: {serial}", RuntimeWarning)
|
||||
except Exception:
|
||||
continue
|
||||
except Exception:
|
||||
pass
|
||||
return ret
|
||||
|
||||
def call_control_api(self, msg):
|
||||
self._handle.controlWrite(Panda.REQUEST_OUT, msg, 0, 0, b'')
|
||||
|
||||
|
||||
Reference in New Issue
Block a user