dragonpilot 2022-11-30T09:37:39 for EON/C2

version: dragonpilot v0.9.1 beta for EON/C2
date: 2022-11-30T09:37:39
dp-dev(priv2) master commit: e2204c868277f20a263c2f93e88c42999d0b0c15
This commit is contained in:
Dragonpilot Team
2022-11-30 09:29:13 +00:00
committed by Comma Device
parent 1fdc1202d1
commit de34fa87ee
149 changed files with 9492 additions and 3631 deletions

View File

@@ -8,18 +8,23 @@ import hashlib
import datetime
import traceback
import warnings
import logging
from functools import wraps
from typing import Optional
from itertools import accumulate
from .dfu import PandaDFU, MCU_TYPE_F2, MCU_TYPE_F4, MCU_TYPE_H7 # pylint: disable=import-error
from .flash_release import flash_release # noqa pylint: disable=import-error
from .update import ensure_st_up_to_date # noqa pylint: disable=import-error
from .serial import PandaSerial # noqa pylint: disable=import-error
from .isotp import isotp_send, isotp_recv # pylint: disable=import-error
from .config import DEFAULT_FW_FN, DEFAULT_H7_FW_FN, SECTOR_SIZES_FX, SECTOR_SIZES_H7 # noqa pylint: disable=import-error
from .config import DEFAULT_FW_FN, DEFAULT_H7_FW_FN, SECTOR_SIZES_FX, SECTOR_SIZES_H7
from .dfu import PandaDFU, MCU_TYPE_F2, MCU_TYPE_F4, MCU_TYPE_H7
from .isotp import isotp_send, isotp_recv
from .spi import SpiHandle
__version__ = '0.0.10'
# setup logging
LOGLEVEL = os.environ.get('LOGLEVEL', 'INFO').upper()
logging.basicConfig(level=LOGLEVEL, format='%(message)s')
BASEDIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "../")
DEBUG = os.getenv("PANDADEBUG") is not None
@@ -49,7 +54,7 @@ def pack_can_buffer(arr):
snds.append(b'')
idx += 1
#Apply counter to each 64 byte packet
# Apply counter to each 64 byte packet
for idx in range(len(snds)):
tx = b''
counter = 0
@@ -167,6 +172,7 @@ class Panda:
SERIAL_ESP = 1
SERIAL_LIN1 = 2
SERIAL_LIN2 = 3
SERIAL_SOM_DEBUG = 4
GMLAN_CAN2 = 1
GMLAN_CAN3 = 2
@@ -183,6 +189,7 @@ class Panda:
HW_TYPE_DOS = b'\x06'
HW_TYPE_RED_PANDA = b'\x07'
HW_TYPE_RED_PANDA_V2 = b'\x08'
HW_TYPE_TRES = b'\x09'
CAN_PACKET_VERSION = 2
HEALTH_PACKET_VERSION = 11
@@ -192,10 +199,10 @@ class Panda:
F2_DEVICES = (HW_TYPE_PEDAL, )
F4_DEVICES = (HW_TYPE_WHITE_PANDA, HW_TYPE_GREY_PANDA, HW_TYPE_BLACK_PANDA, HW_TYPE_UNO, HW_TYPE_DOS)
H7_DEVICES = (HW_TYPE_RED_PANDA, HW_TYPE_RED_PANDA_V2)
H7_DEVICES = (HW_TYPE_RED_PANDA, HW_TYPE_RED_PANDA_V2, HW_TYPE_TRES)
INTERNAL_DEVICES = (HW_TYPE_UNO, HW_TYPE_DOS)
HAS_OBD = (HW_TYPE_BLACK_PANDA, HW_TYPE_UNO, HW_TYPE_DOS, HW_TYPE_RED_PANDA, HW_TYPE_RED_PANDA_V2)
HAS_OBD = (HW_TYPE_BLACK_PANDA, HW_TYPE_UNO, HW_TYPE_DOS, HW_TYPE_RED_PANDA, HW_TYPE_RED_PANDA_V2, HW_TYPE_TRES)
CLOCK_SOURCE_MODE_DISABLED = 0
CLOCK_SOURCE_MODE_FREE_RUNNING = 1
@@ -213,8 +220,8 @@ class Panda:
FLAG_HYUNDAI_HYBRID_GAS = 2
FLAG_HYUNDAI_LONG = 4
FLAG_HYUNDAI_CAMERA_SCC = 8
FLAG_HYUNDAI_CANFD_HDA2 = 8
FLAG_HYUNDAI_CANFD_ALT_BUTTONS = 16
FLAG_HYUNDAI_CANFD_HDA2 = 16
FLAG_HYUNDAI_CANFD_ALT_BUTTONS = 32
FLAG_TESLA_POWERTRAIN = 1
FLAG_TESLA_LONG_CONTROL = 2
@@ -229,7 +236,7 @@ class Panda:
FLAG_GM_HW_CAM = 1
FLAG_GM_HW_CAM_LONG = 2
def __init__(self, serial: Optional[str] = None, claim: bool = True, disable_checks: bool = True):
def __init__(self, serial: Optional[str] = None, claim: bool = True, spi: bool = False, disable_checks: bool = True):
self._serial = serial
self._disable_checks = disable_checks
@@ -237,9 +244,9 @@ class Panda:
self._bcd_device = None
# connect and set mcu type
self._spi = spi
self.connect(claim)
def close(self):
self._handle.close()
self._handle = None
@@ -247,10 +254,30 @@ class Panda:
def connect(self, claim=True, wait=False):
if self._handle is not None:
self.close()
context = usb1.USBContext()
self._handle = None
if self._spi:
self._handle = SpiHandle()
# TODO implement
self._serial = "SPIDEV"
self.bootstub = False
else:
self.usb_connect(claim=claim, wait=wait)
assert self._handle is not None
self._mcu_type = self.get_mcu_type()
self.health_version, self.can_version, self.can_health_version = self.get_packets_versions()
print("connected")
# disable openpilot's heartbeat checks
if self._disable_checks:
self.set_heartbeat_disabled()
self.set_power_save(0)
def usb_connect(self, claim=True, wait=False):
context = usb1.USBContext()
while 1:
try:
for device in context.getDeviceList(skip_on_error=True):
@@ -283,16 +310,6 @@ class Panda:
break
context = usb1.USBContext() # New context needed so new devices show up
assert self._handle is not None
self._mcu_type = self.get_mcu_type()
self.health_version, self.can_version, self.can_health_version = self.get_packets_versions()
print("connected")
# disable openpilot's heartbeat checks
if self._disable_checks:
self.set_heartbeat_disabled()
self.set_power_save(0)
def reset(self, enter_bootstub=False, enter_bootloader=False, reconnect=True):
try:
if enter_bootloader:
@@ -583,9 +600,6 @@ class Panda:
# ******************* configuration *******************
def set_usb_power(self, on):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe6, int(on), 0, b'')
def set_power_save(self, power_save_enabled=0):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe7, int(power_save_enabled), 0, b'')
@@ -711,7 +725,7 @@ class Panda:
def serial_write(self, port_number, ln):
ret = 0
for i in range(0, len(ln), 0x20):
ret += self._handle.bulkWrite(2, struct.pack("B", port_number) + ln[i:i + 0x20])
ret += self._handle.bulkWrite(2, struct.pack("B", port_number) + bytes(ln[i:i + 0x20], 'utf-8'))
return ret
def serial_clear(self, port_number):
@@ -834,7 +848,3 @@ class Panda:
# ****************** Siren *****************
def set_siren(self, enabled):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf6, int(enabled), 0, b'')
# ****************** Debug *****************
def set_green_led(self, enabled):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf7, int(enabled), 0, b'')

View File

@@ -1,74 +0,0 @@
#!/usr/bin/env python3
import sys
import time
import requests
import json
import io
def flash_release(path=None, st_serial=None):
from panda import Panda, PandaDFU
from zipfile import ZipFile
def status(x):
print("\033[1;32;40m" + x + "\033[00m")
if st_serial is not None:
# look for Panda
panda_list = Panda.list()
if len(panda_list) == 0:
raise Exception("panda not found, make sure it's connected and your user can access it")
elif len(panda_list) > 1:
raise Exception("Please only connect one panda")
st_serial = panda_list[0]
print("Using panda with serial %s" % st_serial)
if path is None:
print("Fetching latest firmware from github.com/commaai/panda-artifacts")
r = requests.get("https://raw.githubusercontent.com/commaai/panda-artifacts/master/latest.json")
url = json.loads(r.text)['url']
r = requests.get(url)
print("Fetching firmware from %s" % url)
path = io.BytesIO(r.content)
zf = ZipFile(path)
zf.printdir()
version = zf.read("version").decode().strip()
status("0. Preparing to flash " + str(version))
code_bootstub = zf.read("bootstub.panda.bin")
code_panda = zf.read("panda.bin")
# enter DFU mode
status("1. Entering DFU mode")
panda = Panda(st_serial)
panda.reset(enter_bootstub=True)
panda.reset(enter_bootloader=True)
time.sleep(1)
# program bootstub
status("2. Programming bootstub")
dfu = PandaDFU(PandaDFU.st_serial_to_dfu_serial(st_serial))
dfu.program_bootstub(code_bootstub)
time.sleep(1)
# flash main code
status("3. Flashing main code")
panda = Panda(st_serial)
panda.flash(code=code_panda)
panda.close()
# check for connection
status("4. Verifying version")
panda = Panda(st_serial)
my_version = panda.get_version()
print("dongle id: %s" % panda.get_serial()[0])
print(my_version, "should be", version)
assert(str(version) == str(my_version))
# done!
status("6. Success!")
if __name__ == "__main__":
flash_release(*sys.argv[1:])

105
panda/python/spi.py Normal file
View File

@@ -0,0 +1,105 @@
import math
import struct
import spidev
import logging
from functools import reduce
from typing import List
# Constants
SYNC = 0x5A
HACK = 0x79
DACK = 0x85
NACK = 0x1F
CHECKSUM_START = 0xAB
MAX_RETRY_COUNT = 5
USB_MAX_SIZE = 0x40
# This mimics the handle given by libusb1 for easy interoperability
class SpiHandle:
def __init__(self):
self.spi = spidev.SpiDev() # pylint: disable=c-extension-no-member
self.spi.open(0, 0)
self.spi.max_speed_hz = 30000000
# helpers
def _calc_checksum(self, data: List[int]) -> int:
cksum = CHECKSUM_START
for b in data:
cksum ^= b
return cksum
def _transfer(self, endpoint: int, data, max_rx_len: int = 1000) -> bytes:
logging.debug("starting transfer: endpoint=%d, max_rx_len=%d", endpoint, max_rx_len)
logging.debug("==============================================")
for n in range(MAX_RETRY_COUNT):
logging.debug("\ntry #%d", n+1)
try:
logging.debug("- send header")
packet = struct.pack("<BBHH", SYNC, endpoint, len(data), max_rx_len)
packet += bytes([reduce(lambda x, y: x^y, packet) ^ CHECKSUM_START])
self.spi.xfer2(packet)
logging.debug("- waiting for ACK")
# TODO: add timeout?
dat = b"\x00"
while dat[0] not in [HACK, NACK]:
dat = self.spi.xfer2(b"\x12")
if dat[0] == NACK:
raise Exception("Got NACK response for header")
# send data
logging.debug("- sending data")
packet = bytes([*data, self._calc_checksum(data)])
self.spi.xfer2(packet)
logging.debug("- waiting for ACK")
dat = b"\x00"
while dat[0] not in [DACK, NACK]:
dat = self.spi.xfer2(b"\xab")
if dat[0] == NACK:
raise Exception("Got NACK response for data")
# get response length, then response
response_len_bytes = bytes(self.spi.xfer2(b"\x00" * 2))
response_len = struct.unpack("<H", response_len_bytes)[0]
logging.debug("- receiving response")
dat = bytes(self.spi.xfer2(b"\x00" * (response_len + 1)))
if self._calc_checksum([DACK, *response_len_bytes, *dat]) != 0:
raise Exception("SPI got bad checksum")
return dat[:-1]
except Exception:
logging.exception("SPI transfer failed, %d retries left", n)
raise Exception(f"SPI transaction failed {MAX_RETRY_COUNT} times")
# libusb1 functions
def close(self):
self.spi.close()
def controlWrite(self, request_type: int, request: int, value: int, index: int, data, timeout: int = 0):
return self._transfer(0, struct.pack("<BHHH", request, value, index, 0))
def controlRead(self, request_type: int, request: int, value: int, index: int, length: int, timeout: int = 0):
return self._transfer(0, struct.pack("<BHHH", request, value, index, length))
# TODO: implement these properly
def bulkWrite(self, endpoint: int, data: List[int], timeout: int = 0) -> int:
for x in range(math.ceil(len(data) / USB_MAX_SIZE)):
self._transfer(endpoint, data[USB_MAX_SIZE*x:USB_MAX_SIZE*(x+1)])
return len(data)
def bulkRead(self, endpoint: int, length: int, timeout: int = 0) -> bytes:
ret: List[int] = []
for _ in range(math.ceil(length / USB_MAX_SIZE)):
d = self._transfer(endpoint, [], max_rx_len=USB_MAX_SIZE)
ret += d
if len(d) < USB_MAX_SIZE:
break
return bytes(ret)

View File

@@ -1,44 +0,0 @@
#!/usr/bin/env python3
import os
import time
def ensure_st_up_to_date():
from panda import Panda, PandaDFU, BASEDIR
with open(os.path.join(BASEDIR, "VERSION")) as f:
repo_version = f.read()
repo_version += "-EON" if os.path.isfile('/EON') else "-DEV"
panda = None
panda_dfu = None
while 1:
# break on normal mode Panda
panda_list = Panda.list()
if len(panda_list) > 0:
panda = Panda(panda_list[0])
break
# flash on DFU mode Panda
panda_dfu = PandaDFU.list()
if len(panda_dfu) > 0:
panda_dfu = PandaDFU(panda_dfu[0])
panda_dfu.recover()
print("waiting for board...")
time.sleep(1)
if panda.bootstub or not panda.get_version().startswith(repo_version):
panda.flash()
if panda.bootstub:
panda.recover()
assert(not panda.bootstub)
version = str(panda.get_version())
print("%s should be %s" % (version, repo_version))
assert(version.startswith(repo_version))
if __name__ == "__main__":
ensure_st_up_to_date()