openpilot1/panda/python/usb.py

99 lines
3.6 KiB
Python

import struct
from .base import BaseHandle, BaseSTBootloaderHandle, TIMEOUT
from .constants import McuType
class PandaUsbHandle(BaseHandle):
def __init__(self, libusb_handle):
self._libusb_handle = libusb_handle
def close(self):
self._libusb_handle.close()
def controlWrite(self, request_type: int, request: int, value: int, index: int, data, timeout: int = TIMEOUT, expect_disconnect: bool = False):
return self._libusb_handle.controlWrite(request_type, request, value, index, data, timeout)
def controlRead(self, request_type: int, request: int, value: int, index: int, length: int, timeout: int = TIMEOUT):
return self._libusb_handle.controlRead(request_type, request, value, index, length, timeout)
def bulkWrite(self, endpoint: int, data: bytes, timeout: int = TIMEOUT) -> int:
return self._libusb_handle.bulkWrite(endpoint, data, timeout) # type: ignore
def bulkRead(self, endpoint: int, length: int, timeout: int = TIMEOUT) -> bytes:
return self._libusb_handle.bulkRead(endpoint, length, timeout) # type: ignore
class STBootloaderUSBHandle(BaseSTBootloaderHandle):
DFU_DNLOAD = 1
DFU_UPLOAD = 2
DFU_GETSTATUS = 3
DFU_CLRSTATUS = 4
DFU_ABORT = 6
def __init__(self, libusb_device, libusb_handle):
self._libusb_handle = libusb_handle
# example from F4: lsusb -v | grep Flash
# iInterface 4 @Internal Flash /0x08000000/04*016Kg,01*064Kg,011*128Kg
for i in range(20):
desc = libusb_handle.getStringDescriptor(i, 0)
if desc is not None and desc.startswith("@Internal Flash"):
sector_count = sum([int(s.split('*')[0]) for s in desc.split('/')[-1].split(',')])
break
mcu_by_sector_count = {m.config.sector_count: m for m in McuType}
assert sector_count in mcu_by_sector_count, f"Unkown MCU: {sector_count=}"
self._mcu_type = mcu_by_sector_count[sector_count]
def _status(self) -> None:
while 1:
dat = self._libusb_handle.controlRead(0x21, self.DFU_GETSTATUS, 0, 0, 6)
if dat[1] == 0:
break
def _erase_page_address(self, address: int) -> None:
self._libusb_handle.controlWrite(0x21, self.DFU_DNLOAD, 0, 0, b"\x41" + struct.pack("I", address))
self._status()
def get_mcu_type(self):
return self._mcu_type
def erase_sector(self, sector: int):
self._erase_page_address(self._mcu_type.config.sector_address(sector))
def clear_status(self):
# Clear status
stat = self._libusb_handle.controlRead(0x21, self.DFU_GETSTATUS, 0, 0, 6)
if stat[4] == 0xa:
self._libusb_handle.controlRead(0x21, self.DFU_CLRSTATUS, 0, 0, 0)
elif stat[4] == 0x9:
self._libusb_handle.controlWrite(0x21, self.DFU_ABORT, 0, 0, b"")
self._status()
stat = str(self._libusb_handle.controlRead(0x21, self.DFU_GETSTATUS, 0, 0, 6))
def close(self):
self._libusb_handle.close()
def program(self, address, dat):
# Set Address Pointer
self._libusb_handle.controlWrite(0x21, self.DFU_DNLOAD, 0, 0, b"\x21" + struct.pack("I", address))
self._status()
# Program
bs = min(len(dat), self._mcu_type.config.block_size)
dat += b"\xFF" * ((bs - len(dat)) % bs)
for i in range(len(dat) // bs):
ldat = dat[i * bs:(i + 1) * bs]
print("programming %d with length %d" % (i, len(ldat)))
self._libusb_handle.controlWrite(0x21, self.DFU_DNLOAD, 2 + i, 0, ldat)
self._status()
def jump(self, address):
self._libusb_handle.controlWrite(0x21, self.DFU_DNLOAD, 0, 0, b"\x21" + struct.pack("I", address))
self._status()
try:
self._libusb_handle.controlWrite(0x21, self.DFU_DNLOAD, 2, 0, b"")
_ = str(self._libusb_handle.controlRead(0x21, self.DFU_GETSTATUS, 0, 0, 6))
except Exception:
pass