mirror of
https://github.com/infiniteCable2/panda.git
synced 2026-02-18 17:23:52 +08:00
python lib: add SPI bootloader support (#1224)
* talk to spi bootloader * flashing bootstub * reset * get mcu type * little more * pull out low level panda comms * lint * program app --------- Co-authored-by: Comma Device <device@comma.ai>
This commit is contained in:
@@ -7,6 +7,7 @@ BASEDIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "../")
|
||||
|
||||
class McuConfig(NamedTuple):
|
||||
mcu: str
|
||||
mcu_idcode: int
|
||||
block_size: int
|
||||
sector_sizes: List[int]
|
||||
serial_number_address: int
|
||||
@@ -24,11 +25,12 @@ Fx = (
|
||||
0x8000000,
|
||||
os.path.join(BASEDIR, "board", "obj", "bootstub.panda.bin"),
|
||||
)
|
||||
F2Config = McuConfig("STM32F2", *Fx)
|
||||
F4Config = McuConfig("STM32F4", *Fx)
|
||||
F2Config = McuConfig("STM32F2", 0x411, *Fx)
|
||||
F4Config = McuConfig("STM32F4", 0x463, *Fx)
|
||||
|
||||
H7Config = McuConfig(
|
||||
"STM32H7",
|
||||
0x483,
|
||||
0x400,
|
||||
# there is an 8th sector, but we use that for the provisioning chunk, so don't program over that!
|
||||
[0x20000 for _ in range(7)],
|
||||
|
||||
@@ -4,6 +4,7 @@ import math
|
||||
import time
|
||||
import struct
|
||||
import logging
|
||||
import threading
|
||||
from contextlib import contextmanager
|
||||
from functools import reduce
|
||||
from typing import List
|
||||
@@ -47,26 +48,42 @@ class PandaSpiTransferFailed(PandaSpiException):
|
||||
pass
|
||||
|
||||
|
||||
@contextmanager
|
||||
def flocked(fd):
|
||||
try:
|
||||
fcntl.flock(fd, fcntl.LOCK_EX)
|
||||
yield
|
||||
finally:
|
||||
fcntl.flock(fd, fcntl.LOCK_UN)
|
||||
SPI_LOCK = threading.Lock()
|
||||
|
||||
# This mimics the handle given by libusb1 for easy interoperability
|
||||
class SpiHandle:
|
||||
def __init__(self):
|
||||
class SpiDevice:
|
||||
"""
|
||||
Provides locked, thread-safe access to a panda's SPI interface.
|
||||
"""
|
||||
def __init__(self, speed=30000000):
|
||||
if not os.path.exists(DEV_PATH):
|
||||
raise PandaSpiUnavailable(f"SPI device not found: {DEV_PATH}")
|
||||
if spidev is None:
|
||||
raise PandaSpiUnavailable("spidev is not installed")
|
||||
|
||||
self.spi = spidev.SpiDev() # pylint: disable=c-extension-no-member
|
||||
self.spi.open(0, 0)
|
||||
self._spidev = spidev.SpiDev() # pylint: disable=c-extension-no-member
|
||||
self._spidev.open(0, 0)
|
||||
self._spidev.max_speed_hz = speed
|
||||
|
||||
self.spi.max_speed_hz = 30000000
|
||||
@contextmanager
|
||||
def acquire(self):
|
||||
try:
|
||||
SPI_LOCK.acquire()
|
||||
fcntl.flock(self._spidev, fcntl.LOCK_EX)
|
||||
yield self._spidev
|
||||
finally:
|
||||
fcntl.flock(self._spidev, fcntl.LOCK_UN)
|
||||
SPI_LOCK.release()
|
||||
|
||||
def close(self):
|
||||
self._spidev.close()
|
||||
|
||||
|
||||
class SpiHandle:
|
||||
"""
|
||||
A class that mimics a libusb1 handle for panda SPI communications.
|
||||
"""
|
||||
def __init__(self):
|
||||
self.dev = SpiDevice()
|
||||
|
||||
# helpers
|
||||
def _calc_checksum(self, data: List[int]) -> int:
|
||||
@@ -75,10 +92,10 @@ class SpiHandle:
|
||||
cksum ^= b
|
||||
return cksum
|
||||
|
||||
def _wait_for_ack(self, ack_val: int) -> None:
|
||||
def _wait_for_ack(self, spi, ack_val: int) -> None:
|
||||
start = time.monotonic()
|
||||
while (time.monotonic() - start) < ACK_TIMEOUT_SECONDS:
|
||||
dat = self.spi.xfer2(b"\x12")[0]
|
||||
dat = spi.xfer2(b"\x12")[0]
|
||||
if dat == NACK:
|
||||
raise PandaSpiNackResponse
|
||||
elif dat == ack_val:
|
||||
@@ -86,7 +103,7 @@ class SpiHandle:
|
||||
|
||||
raise PandaSpiMissingAck
|
||||
|
||||
def _transfer(self, endpoint: int, data, max_rx_len: int = 1000) -> bytes:
|
||||
def _transfer(self, spi, endpoint: int, data, max_rx_len: int = 1000) -> bytes:
|
||||
logging.debug("starting transfer: endpoint=%d, max_rx_len=%d", endpoint, max_rx_len)
|
||||
logging.debug("==============================================")
|
||||
|
||||
@@ -97,25 +114,25 @@ class SpiHandle:
|
||||
logging.debug("- send header")
|
||||
packet = struct.pack("<BBHH", SYNC, endpoint, len(data), max_rx_len)
|
||||
packet += bytes([reduce(lambda x, y: x^y, packet) ^ CHECKSUM_START])
|
||||
self.spi.xfer2(packet)
|
||||
spi.xfer2(packet)
|
||||
|
||||
logging.debug("- waiting for header ACK")
|
||||
self._wait_for_ack(HACK)
|
||||
self._wait_for_ack(spi, HACK)
|
||||
|
||||
# send data
|
||||
logging.debug("- sending data")
|
||||
packet = bytes([*data, self._calc_checksum(data)])
|
||||
self.spi.xfer2(packet)
|
||||
spi.xfer2(packet)
|
||||
|
||||
logging.debug("- waiting for data ACK")
|
||||
self._wait_for_ack(DACK)
|
||||
self._wait_for_ack(spi, DACK)
|
||||
|
||||
# get response length, then response
|
||||
response_len_bytes = bytes(self.spi.xfer2(b"\x00" * 2))
|
||||
response_len_bytes = bytes(spi.xfer2(b"\x00" * 2))
|
||||
response_len = struct.unpack("<H", response_len_bytes)[0]
|
||||
|
||||
logging.debug("- receiving response")
|
||||
dat = bytes(self.spi.xfer2(b"\x00" * (response_len + 1)))
|
||||
dat = bytes(spi.xfer2(b"\x00" * (response_len + 1)))
|
||||
if self._calc_checksum([DACK, *response_len_bytes, *dat]) != 0:
|
||||
raise PandaSpiBadChecksum
|
||||
|
||||
@@ -127,28 +144,28 @@ class SpiHandle:
|
||||
|
||||
# libusb1 functions
|
||||
def close(self):
|
||||
self.spi.close()
|
||||
self.dev.close()
|
||||
|
||||
def controlWrite(self, request_type: int, request: int, value: int, index: int, data, timeout: int = 0):
|
||||
with flocked(self.spi):
|
||||
return self._transfer(0, struct.pack("<BHHH", request, value, index, 0))
|
||||
with self.dev.acquire() as spi:
|
||||
return self._transfer(spi, 0, struct.pack("<BHHH", request, value, index, 0))
|
||||
|
||||
def controlRead(self, request_type: int, request: int, value: int, index: int, length: int, timeout: int = 0):
|
||||
with flocked(self.spi):
|
||||
return self._transfer(0, struct.pack("<BHHH", request, value, index, length))
|
||||
with self.dev.acquire() as spi:
|
||||
return self._transfer(spi, 0, struct.pack("<BHHH", request, value, index, length))
|
||||
|
||||
# TODO: implement these properly
|
||||
def bulkWrite(self, endpoint: int, data: List[int], timeout: int = 0) -> int:
|
||||
with flocked(self.spi):
|
||||
with self.dev.acquire() as spi:
|
||||
for x in range(math.ceil(len(data) / USB_MAX_SIZE)):
|
||||
self._transfer(endpoint, data[USB_MAX_SIZE*x:USB_MAX_SIZE*(x+1)])
|
||||
self._transfer(spi, endpoint, data[USB_MAX_SIZE*x:USB_MAX_SIZE*(x+1)])
|
||||
return len(data)
|
||||
|
||||
def bulkRead(self, endpoint: int, length: int, timeout: int = 0) -> bytes:
|
||||
ret: List[int] = []
|
||||
with flocked(self.spi):
|
||||
with self.dev.acquire() as spi:
|
||||
for _ in range(math.ceil(length / USB_MAX_SIZE)):
|
||||
d = self._transfer(endpoint, [], max_rx_len=USB_MAX_SIZE)
|
||||
d = self._transfer(spi, endpoint, [], max_rx_len=USB_MAX_SIZE)
|
||||
ret += d
|
||||
if len(d) < USB_MAX_SIZE:
|
||||
break
|
||||
|
||||
118
python/spi_dfu.py
Normal file
118
python/spi_dfu.py
Normal file
@@ -0,0 +1,118 @@
|
||||
import time
|
||||
import struct
|
||||
from functools import reduce
|
||||
|
||||
from .constants import McuType
|
||||
from .spi import SpiDevice
|
||||
|
||||
SYNC = 0x5A
|
||||
ACK = 0x79
|
||||
NACK = 0x1F
|
||||
|
||||
# https://www.st.com/resource/en/application_note/an4286-spi-protocol-used-in-the-stm32-bootloader-stmicroelectronics.pdf
|
||||
class PandaSpiDFU:
|
||||
def __init__(self, dfu_serial):
|
||||
self.dev = SpiDevice(speed=1000000)
|
||||
|
||||
# say hello
|
||||
with self.dev.acquire() as spi:
|
||||
try:
|
||||
spi.xfer([SYNC, ])
|
||||
self._get_ack(spi)
|
||||
except Exception:
|
||||
raise Exception("failed to connect to panda") # pylint: disable=W0707
|
||||
|
||||
self._mcu_type = self.get_mcu_type()
|
||||
|
||||
def _get_ack(self, spi, timeout=1.0):
|
||||
data = 0x00
|
||||
start_time = time.monotonic()
|
||||
while data not in (ACK, NACK) and (time.monotonic() - start_time < timeout):
|
||||
data = spi.xfer([0x00, ])[0]
|
||||
time.sleep(0.001)
|
||||
spi.xfer([ACK, ])
|
||||
|
||||
if data == NACK:
|
||||
raise Exception("Got NACK response")
|
||||
elif data != ACK:
|
||||
raise Exception("Missing ACK")
|
||||
|
||||
def _cmd(self, cmd, data=None, read_bytes=0):
|
||||
with self.dev.acquire() as spi:
|
||||
# sync
|
||||
spi.xfer([SYNC, ])
|
||||
|
||||
# send command
|
||||
spi.xfer([cmd, cmd ^ 0xFF])
|
||||
self._get_ack(spi)
|
||||
|
||||
# send data
|
||||
if data is not None:
|
||||
for d in data:
|
||||
spi.xfer(self.add_checksum(d))
|
||||
self._get_ack(spi, timeout=20)
|
||||
|
||||
# receive
|
||||
ret = None
|
||||
if read_bytes > 0:
|
||||
# send busy byte
|
||||
ret = spi.xfer([0x00, ]*(read_bytes + 1))[1:]
|
||||
self._get_ack(spi)
|
||||
|
||||
return ret
|
||||
|
||||
def add_checksum(self, data):
|
||||
return data + bytes([reduce(lambda a, b: a ^ b, data)])
|
||||
|
||||
# ***** ST Bootloader functions *****
|
||||
|
||||
def get_bootloader_version(self) -> int:
|
||||
return self._cmd(0x01, read_bytes=1)[0]
|
||||
|
||||
def get_id(self) -> int:
|
||||
ret = self._cmd(0x02, read_bytes=3)
|
||||
assert ret[0] == 1
|
||||
return ((ret[1] << 8) + ret[2])
|
||||
|
||||
def go_cmd(self, address: int) -> None:
|
||||
self._cmd(0x21, data=[struct.pack('>I', address), ])
|
||||
|
||||
def erase(self, address: int) -> None:
|
||||
d = struct.pack('>H', address)
|
||||
self._cmd(0x44, data=[d, ])
|
||||
|
||||
# ***** panda api *****
|
||||
|
||||
def get_mcu_type(self) -> McuType:
|
||||
mcu_by_id = {mcu.config.mcu_idcode: mcu for mcu in McuType}
|
||||
return mcu_by_id[self.get_id()]
|
||||
|
||||
def global_erase(self):
|
||||
self.erase(0xFFFF)
|
||||
|
||||
def program_file(self, address, fn):
|
||||
with open(fn, 'rb') as f:
|
||||
code = f.read()
|
||||
|
||||
i = 0
|
||||
while i < len(code):
|
||||
#print(i, len(code))
|
||||
block = code[i:i+256]
|
||||
if len(block) < 256:
|
||||
block += b'\xFF' * (256 - len(block))
|
||||
|
||||
self._cmd(0x31, data=[
|
||||
struct.pack('>I', address + i),
|
||||
bytes([len(block) - 1]) + block,
|
||||
])
|
||||
#print(f"Written {len(block)} bytes to {hex(address + i)}")
|
||||
i += 256
|
||||
|
||||
def program_bootstub(self):
|
||||
self.program_file(self._mcu_type.config.bootstub_address, self._mcu_type.config.bootstub_path)
|
||||
|
||||
def program_app(self):
|
||||
self.program_file(self._mcu_type.config.app_address, self._mcu_type.config.app_path)
|
||||
|
||||
def reset(self):
|
||||
self.go_cmd(self._mcu_type.config.bootstub_address)
|
||||
Reference in New Issue
Block a user