SPI kernel driver (#1497)

* kernel driver

* fix checksum check

* cleanup

---------

Co-authored-by: Comma Device <device@comma.ai>
This commit is contained in:
Adeeb Shihadeh
2023-07-15 12:26:24 -07:00
committed by GitHub
parent a9a3227008
commit 70c7763124
9 changed files with 1192 additions and 1 deletions

View File

@@ -1,4 +1,5 @@
import binascii
import ctypes
import os
import fcntl
import math
@@ -16,8 +17,10 @@ from .utils import crc8_pedal
try:
import spidev
import spidev2
except ImportError:
spidev = None
spidev2 = None
# Constants
SYNC = 0x5A
@@ -55,6 +58,17 @@ class PandaSpiTransferFailed(PandaSpiException):
SPI_LOCK = threading.Lock()
class PandaSpiTransfer(ctypes.Structure):
_fields_ = [
('rx_buf', ctypes.c_uint64),
('tx_buf', ctypes.c_uint64),
('tx_length', ctypes.c_uint32),
('rx_length_max', ctypes.c_uint32),
('timeout', ctypes.c_uint32),
('endpoint', ctypes.c_uint8),
('expect_disconnect', ctypes.c_uint8),
]
class SpiDevice:
"""
Provides locked, thread-safe access to a panda's SPI interface.
@@ -90,6 +104,7 @@ class SpiDevice:
self._spidev.close()
class PandaSpiHandle(BaseHandle):
"""
A class that mimics a libusb1 handle for panda SPI communications.
@@ -97,6 +112,21 @@ class PandaSpiHandle(BaseHandle):
def __init__(self):
self.dev = SpiDevice()
self._transfer = self._transfer_spidev
if "KERN" in os.environ:
self._transfer = self._transfer_kernel_driver
self.tx_buf = bytearray(1024)
self.rx_buf = bytearray(1024)
tx_buf_raw = ctypes.c_char.from_buffer(self.tx_buf)
rx_buf_raw = ctypes.c_char.from_buffer(self.rx_buf)
self.ioctl_data = PandaSpiTransfer()
self.ioctl_data.tx_buf = ctypes.addressof(tx_buf_raw)
self.ioctl_data.rx_buf = ctypes.addressof(rx_buf_raw)
self.fileno = self.dev._spidev.fileno()
# helpers
def _calc_checksum(self, data: List[int]) -> int:
cksum = CHECKSUM_START
@@ -117,7 +147,23 @@ class PandaSpiHandle(BaseHandle):
raise PandaSpiMissingAck
def _transfer(self, spi, endpoint: int, data, timeout: int, max_rx_len: int = 1000, expect_disconnect: bool = False) -> bytes:
def _transfer_kernel_driver(self, spi, endpoint: int, data, timeout: int, max_rx_len: int = 1000, expect_disconnect: bool = False) -> bytes:
self.tx_buf[:len(data)] = data
self.ioctl_data.endpoint = endpoint
self.ioctl_data.tx_length = len(data)
self.ioctl_data.rx_length_max = max_rx_len
self.ioctl_data.expect_disconnect = int(expect_disconnect)
# TODO: use our own ioctl request
try:
ret = fcntl.ioctl(self.fileno, spidev2.SPI_IOC_RD_LSB_FIRST, self.ioctl_data)
except OSError as e:
raise PandaSpiException from e
if ret < 0:
raise PandaSpiException(f"ioctl returned {ret}")
return bytes(self.rx_buf[:ret])
def _transfer_spidev(self, spi, endpoint: int, data, timeout: int, max_rx_len: int = 1000, expect_disconnect: bool = False) -> bytes:
logging.debug("starting transfer: endpoint=%d, max_rx_len=%d", endpoint, max_rx_len)
logging.debug("==============================================")