lp-dp 2023-08-02T01:57:46 for EON/C2

version: lp-dp v0.9.4 for EON/C2
date: 2023-08-02T01:57:46
commit: 63c51d4b134e443b1ffa59dfe0fc2d50d5e0f446
This commit is contained in:
Vehicle Researcher
2023-08-02 01:56:06 +00:00
parent 0ffb1567bc
commit 9ca1ac0042
250 changed files with 7113 additions and 3599 deletions

View File

@@ -17,7 +17,7 @@ from .base import BaseHandle
from .constants import FW_PATH, McuType
from .dfu import PandaDFU
from .isotp import isotp_send, isotp_recv
from .spi import PandaSpiHandle, PandaSpiException
from .spi import PandaSpiHandle, PandaSpiException, PandaProtocolMismatch
from .usb import PandaUsbHandle
__version__ = '0.0.10'
@@ -26,7 +26,6 @@ __version__ = '0.0.10'
LOGLEVEL = os.environ.get('LOGLEVEL', 'INFO').upper()
logging.basicConfig(level=LOGLEVEL, format='%(message)s')
USBPACKET_MAX_SIZE = 0x40
CANPACKET_HEAD_SIZE = 0x6
DLC_TO_LEN = [0, 1, 2, 3, 4, 5, 6, 7, 8, 12, 16, 20, 24, 32, 48, 64]
LEN_TO_DLC = {length: dlc for (dlc, length) in enumerate(DLC_TO_LEN)}
@@ -122,6 +121,24 @@ def ensure_can_health_packet_version(fn):
return fn(self, *args, **kwargs)
return wrapper
def parse_timestamp(dat):
a = struct.unpack("HBBBBBB", dat)
if a[0] == 0:
return None
try:
return datetime.datetime(a[0], a[1], a[2], a[4], a[5], a[6])
except ValueError:
return None
def unpack_log(dat):
return {
'id': struct.unpack("H", dat[:2])[0],
'timestamp': parse_timestamp(dat[2:10]),
'uptime': struct.unpack("I", dat[10:14])[0],
'msg': bytes(dat[14:]).decode('utf-8', 'ignore').strip('\x00'),
}
class ALTERNATIVE_EXPERIENCE:
DEFAULT = 0
DISABLE_DISENGAGE_ON_GAS = 1
@@ -184,12 +201,12 @@ class Panda:
CAN_PACKET_VERSION = 4
HEALTH_PACKET_VERSION = 14
CAN_HEALTH_PACKET_VERSION = 4
# dp - 2 extra "B" at the end:
CAN_HEALTH_PACKET_VERSION = 5
#dp - 2 extra "B" at the end:
# "usb_power_mode": a[23],
# "torque_interceptor_detected": a[24],
HEALTH_STRUCT = struct.Struct("<IIIIIIIIIBBBBBBHBBBHfBBHBHHBB")
CAN_HEALTH_STRUCT = struct.Struct("<BIBBBBBBBBIIIIIIIHHBBB")
CAN_HEALTH_STRUCT = struct.Struct("<BIBBBBBBBBIIIIIIIHHBBBIIII")
F2_DEVICES = (HW_TYPE_PEDAL, )
F4_DEVICES = (HW_TYPE_WHITE_PANDA, HW_TYPE_GREY_PANDA, HW_TYPE_BLACK_PANDA, HW_TYPE_UNO, HW_TYPE_DOS)
@@ -240,6 +257,7 @@ class Panda:
FLAG_GM_HW_CAM_LONG = 2
FLAG_FORD_LONG_CONTROL = 1
FLAG_FORD_CANFD = 2
def __init__(self, serial: Optional[str] = None, claim: bool = True, disable_checks: bool = True):
self._connect_serial = serial
@@ -269,10 +287,14 @@ class Panda:
def connect(self, claim=True, wait=False):
self.close()
# try USB first, then SPI
self._handle, serial, self.bootstub, bcd = self.usb_connect(self._connect_serial, claim=claim, wait=wait)
if self._handle is None:
self._handle, serial, self.bootstub, bcd = self.spi_connect(self._connect_serial)
self._handle = None
while self._handle is None:
# try USB first, then SPI
self._handle, serial, self.bootstub, bcd = self.usb_connect(self._connect_serial, claim=claim)
if self._handle is None:
self._handle, serial, self.bootstub, bcd = self.spi_connect(self._connect_serial)
if not wait:
break
if self._handle is None:
raise Exception("failed to connect to panda")
@@ -307,16 +329,30 @@ class Panda:
self.set_power_save(0)
@staticmethod
def spi_connect(serial):
def spi_connect(serial, ignore_version=False):
# get UID to confirm slave is present and up
handle = None
spi_serial = None
bootstub = None
spi_version = None
try:
handle = PandaSpiHandle()
dat = handle.controlRead(Panda.REQUEST_IN, 0xc3, 0, 0, 12, timeout=100)
spi_serial = binascii.hexlify(dat).decode()
bootstub = Panda.flasher_present(handle)
# connect by protcol version
try:
dat = handle.get_protocol_version()
spi_serial = binascii.hexlify(dat[:12]).decode()
pid = dat[13]
if pid not in (0xcc, 0xee):
raise PandaSpiException("invalid bootstub status")
bootstub = pid == 0xee
spi_version = dat[14]
except PandaSpiException:
# fallback, we'll raise a protocol mismatch below
dat = handle.controlRead(Panda.REQUEST_IN, 0xc3, 0, 0, 12, timeout=100)
spi_serial = binascii.hexlify(dat).decode()
bootstub = Panda.flasher_present(handle)
spi_version = 0
except PandaSpiException:
pass
@@ -326,49 +362,53 @@ class Panda:
spi_serial = None
bootstub = False
# ensure our protocol version matches the panda
if handle is not None and not ignore_version:
if spi_version != handle.PROTOCOL_VERSION:
err = f"panda protocol mismatch: expected {handle.PROTOCOL_VERSION}, got {spi_version}. reflash panda"
raise PandaProtocolMismatch(err)
return handle, spi_serial, bootstub, None
@staticmethod
def usb_connect(serial, claim=True, wait=False):
def usb_connect(serial, claim=True):
handle, usb_serial, bootstub, bcd = None, None, None, None
while 1:
context = usb1.USBContext()
context.open()
try:
for device in context.getDeviceList(skip_on_error=True):
if device.getVendorID() == 0xbbaa and device.getProductID() in (0xddcc, 0xddee):
try:
this_serial = device.getSerialNumber()
except Exception:
continue
context = usb1.USBContext()
context.open()
try:
for device in context.getDeviceList(skip_on_error=True):
if device.getVendorID() == 0xbbaa and device.getProductID() in (0xddcc, 0xddee):
try:
this_serial = device.getSerialNumber()
except Exception:
continue
if serial is None or this_serial == serial:
logging.debug("opening device %s %s", this_serial, hex(device.getProductID()))
if serial is None or this_serial == serial:
logging.debug("opening device %s %s", this_serial, hex(device.getProductID()))
usb_serial = this_serial
bootstub = device.getProductID() == 0xddee
handle = device.open()
if sys.platform not in ("win32", "cygwin", "msys", "darwin"):
handle.setAutoDetachKernelDriver(True)
if claim:
handle.claimInterface(0)
# handle.setInterfaceAltSetting(0, 0) # Issue in USB stack
usb_serial = this_serial
bootstub = device.getProductID() == 0xddee
handle = device.open()
if sys.platform not in ("win32", "cygwin", "msys", "darwin"):
handle.setAutoDetachKernelDriver(True)
if claim:
handle.claimInterface(0)
# handle.setInterfaceAltSetting(0, 0) # Issue in USB stack
# bcdDevice wasn't always set to the hw type, ignore if it's the old constant
this_bcd = device.getbcdDevice()
if this_bcd is not None and this_bcd != 0x2300:
bcd = bytearray([this_bcd >> 8, ])
# bcdDevice wasn't always set to the hw type, ignore if it's the old constant
this_bcd = device.getbcdDevice()
if this_bcd is not None and this_bcd != 0x2300:
bcd = bytearray([this_bcd >> 8, ])
break
except Exception:
logging.exception("USB connect error")
if not wait or handle is not None:
break
context.close()
break
except Exception:
logging.exception("USB connect error")
usb_handle = None
if handle is not None:
usb_handle = PandaUsbHandle(handle)
else:
context.close()
return usb_handle, usb_serial, bootstub, bcd
@@ -399,7 +439,7 @@ class Panda:
@staticmethod
def spi_list():
_, serial, _, _ = Panda.spi_connect(None)
_, serial, _, _ = Panda.spi_connect(None, ignore_version=True)
if serial is not None:
return [serial, ]
return []
@@ -409,12 +449,12 @@ class Panda:
timeout = 5000 if isinstance(self._handle, PandaSpiHandle) else 15000
try:
if enter_bootloader:
self._handle.controlWrite(Panda.REQUEST_IN, 0xd1, 0, 0, b'', timeout=timeout)
self._handle.controlWrite(Panda.REQUEST_IN, 0xd1, 0, 0, b'', timeout=timeout, expect_disconnect=True)
else:
if enter_bootstub:
self._handle.controlWrite(Panda.REQUEST_IN, 0xd1, 1, 0, b'', timeout=timeout)
self._handle.controlWrite(Panda.REQUEST_IN, 0xd1, 1, 0, b'', timeout=timeout, expect_disconnect=True)
else:
self._handle.controlWrite(Panda.REQUEST_IN, 0xd8, 0, 0, b'', timeout=timeout)
self._handle.controlWrite(Panda.REQUEST_IN, 0xd8, 0, 0, b'', timeout=timeout, expect_disconnect=True)
except Exception:
pass
if not enter_bootloader and reconnect:
@@ -483,7 +523,7 @@ class Panda:
# reset
logging.warning("flash: resetting")
try:
handle.controlWrite(Panda.REQUEST_IN, 0xd8, 0, 0, b'')
handle.controlWrite(Panda.REQUEST_IN, 0xd8, 0, 0, b'', expect_disconnect=True)
except Exception:
pass
@@ -540,6 +580,18 @@ class Panda:
dfu_list = PandaDFU.list()
return True
@staticmethod
def wait_for_panda(serial: Optional[str], timeout: int) -> bool:
t_start = time.monotonic()
serials = Panda.list()
while (serial is None and len(serials) == 0) or (serial is not None and serial not in serials):
logging.debug("waiting for panda...")
time.sleep(0.1)
if timeout is not None and (time.monotonic() - t_start) > timeout:
return False
serials = Panda.list()
return True
def up_to_date(self) -> bool:
current = self.get_signature()
fn = os.path.join(FW_PATH, self.get_mcu_type().config.app_fn)
@@ -624,6 +676,10 @@ class Panda:
"canfd_enabled": a[19],
"brs_enabled": a[20],
"canfd_non_iso": a[21],
"irq0_call_rate": a[22],
"irq1_call_rate": a[23],
"irq2_call_rate": a[24],
"can_core_reset_count": a[25],
}
# ******************* control *******************
@@ -710,6 +766,10 @@ class Panda:
def get_secret(self):
return self._handle.controlRead(Panda.REQUEST_IN, 0xd0, 1, 0, 0x10)
def get_interrupt_call_rate(self, irqnum):
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xc4, int(irqnum), 0, 4)
return struct.unpack("I", dat)[0]
# ******************* configuration *******************
def set_power_save(self, power_save_enabled=0):
@@ -933,8 +993,7 @@ class Panda:
def get_datetime(self):
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xa0, 0, 0, 8)
a = struct.unpack("HBBBBBB", dat)
return datetime.datetime(a[0], a[1], a[2], a[4], a[5], a[6])
return parse_timestamp(dat)
# ****************** Timer *****************
def get_microsecond_timer(self):
@@ -965,3 +1024,15 @@ class Panda:
# ****************** Debug *****************
def set_green_led(self, enabled):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf7, int(enabled), 0, b'')
# ****************** Logging *****************
def get_logs(self, last_id=None, get_all=False):
assert (last_id is None) or (0 <= last_id < 0xFFFF)
logs = []
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xfd, 1 if get_all else 0, last_id if last_id is not None else 0xFFFF, 0x40)
while len(dat) > 0:
if len(dat) == 0x40:
logs.append(unpack_log(dat))
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xfd, 0, 0xFFFF, 0x40)
return logs