# python library to interface with panda import datetime import struct import hashlib import usb1 import os import time import traceback import sys from functools import wraps from .dfu import PandaDFU, MCU_TYPE_F2, MCU_TYPE_F4, MCU_TYPE_H7 # pylint: disable=import-error from .flash_release import flash_release # noqa pylint: disable=import-error from .update import ensure_st_up_to_date # noqa pylint: disable=import-error from .serial import PandaSerial # noqa pylint: disable=import-error from .isotp import isotp_send, isotp_recv # pylint: disable=import-error from .config import DEFAULT_FW_FN, DEFAULT_H7_FW_FN # noqa pylint: disable=import-error __version__ = '0.0.10' BASEDIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "../") DEBUG = os.getenv("PANDADEBUG") is not None CANPACKET_HEAD_SIZE = 0x5 DLC_TO_LEN = [0, 1, 2, 3, 4, 5, 6, 7, 8, 12, 16, 20, 24, 32, 48, 64] LEN_TO_DLC = {length: dlc for (dlc, length) in enumerate(DLC_TO_LEN)} def pack_can_buffer(arr): snds = [b''] idx = 0 for address, _, dat, bus in arr: assert len(dat) in LEN_TO_DLC if DEBUG: print(f" W 0x{address:x}: 0x{dat.hex()}") extended = 1 if address >= 0x800 else 0 data_len_code = LEN_TO_DLC[len(dat)] header = bytearray(5) word_4b = address << 3 | extended << 2 header[0] = (data_len_code << 4) | (bus << 1) header[1] = word_4b & 0xFF header[2] = (word_4b >> 8) & 0xFF header[3] = (word_4b >> 16) & 0xFF header[4] = (word_4b >> 24) & 0xFF snds[idx] += header + dat if len(snds[idx]) > 256: # Limit chunks to 256 bytes snds.append(b'') idx += 1 #Apply counter to each 64 byte packet for idx in range(len(snds)): tx = b'' counter = 0 for i in range (0, len(snds[idx]), 63): tx += bytes([counter]) + snds[idx][i:i+63] counter += 1 snds[idx] = tx return snds def unpack_can_buffer(dat): ret = [] counter = 0 tail = bytearray() for i in range(0, len(dat), 64): if counter != dat[i]: print("CAN: LOST RECV PACKET COUNTER") break counter+=1 chunk = tail + dat[i+1:i+64] tail = bytearray() pos = 0 while pos>4)] pckt_len = CANPACKET_HEAD_SIZE + data_len if pckt_len <= len(chunk[pos:]): header = chunk[pos:pos+CANPACKET_HEAD_SIZE] if len(header) < 5: print("CAN: MALFORMED USB RECV PACKET") break bus = (header[0] >> 1) & 0x7 address = (header[4] << 24 | header[3] << 16 | header[2] << 8 | header[1]) >> 3 returned = (header[1] >> 1) & 0x1 rejected = header[1] & 0x1 data = chunk[pos + CANPACKET_HEAD_SIZE:pos + CANPACKET_HEAD_SIZE + data_len] if returned: bus += 128 if rejected: bus += 192 if DEBUG: print(f" R 0x{address:x}: 0x{data.hex()}") ret.append((address, 0, data, bus)) pos += pckt_len else: tail = chunk[pos:] break return ret def ensure_health_packet_version(fn): @wraps(fn) def wrapper(self, *args, **kwargs): if self.health_version < self.HEALTH_PACKET_VERSION: raise RuntimeError("Panda firmware has outdated health packet definition. Reflash panda firmware.") elif self.health_version > self.HEALTH_PACKET_VERSION: raise RuntimeError("Panda python library has outdated health packet definition. Update panda python library.") return fn(self, *args, **kwargs) return wrapper def ensure_can_packet_version(fn): @wraps(fn) def wrapper(self, *args, **kwargs): if self.can_version < self.CAN_PACKET_VERSION: raise RuntimeError("Panda firmware has outdated CAN packet definition. Reflash panda firmware.") elif self.can_version > self.CAN_PACKET_VERSION: raise RuntimeError("Panda python library has outdated CAN packet definition. Update panda python library.") return fn(self, *args, **kwargs) return wrapper class Panda(object): # matches cereal.car.CarParams.SafetyModel SAFETY_SILENT = 0 SAFETY_HONDA_NIDEC = 1 SAFETY_TOYOTA = 2 SAFETY_ELM327 = 3 SAFETY_GM = 4 SAFETY_HONDA_BOSCH_GIRAFFE = 5 SAFETY_FORD = 6 SAFETY_HYUNDAI = 8 SAFETY_CHRYSLER = 9 SAFETY_TESLA = 10 SAFETY_SUBARU = 11 SAFETY_MAZDA = 13 SAFETY_NISSAN = 14 SAFETY_VOLKSWAGEN_MQB = 15 SAFETY_ALLOUTPUT = 17 SAFETY_GM_ASCM = 18 SAFETY_NOOUTPUT = 19 SAFETY_HONDA_BOSCH = 20 SAFETY_VOLKSWAGEN_PQ = 21 SAFETY_SUBARU_LEGACY = 22 SAFETY_HYUNDAI_LEGACY = 23 SERIAL_DEBUG = 0 SERIAL_ESP = 1 SERIAL_LIN1 = 2 SERIAL_LIN2 = 3 GMLAN_CAN2 = 1 GMLAN_CAN3 = 2 REQUEST_IN = usb1.ENDPOINT_IN | usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE REQUEST_OUT = usb1.ENDPOINT_OUT | usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE HW_TYPE_UNKNOWN = b'\x00' HW_TYPE_WHITE_PANDA = b'\x01' HW_TYPE_GREY_PANDA = b'\x02' HW_TYPE_BLACK_PANDA = b'\x03' HW_TYPE_PEDAL = b'\x04' HW_TYPE_UNO = b'\x05' HW_TYPE_DOS = b'\x06' HW_TYPE_RED_PANDA = b'\x07' CAN_PACKET_VERSION = 2 HEALTH_PACKET_VERSION = 1 F2_DEVICES = [HW_TYPE_PEDAL] F4_DEVICES = [HW_TYPE_WHITE_PANDA, HW_TYPE_GREY_PANDA, HW_TYPE_BLACK_PANDA, HW_TYPE_UNO, HW_TYPE_DOS] H7_DEVICES = [HW_TYPE_RED_PANDA] CLOCK_SOURCE_MODE_DISABLED = 0 CLOCK_SOURCE_MODE_FREE_RUNNING = 1 CLOCK_SOURCE_MODE_EXTERNAL_SYNC = 2 FLAG_HONDA_ALT_BRAKE = 1 FLAG_HONDA_BOSCH_LONG = 2 FLAG_HONDA_NIDEC_ALT = 4 FLAG_HYUNDAI_EV_GAS = 1 FLAG_HYUNDAI_HYBRID_GAS = 2 FLAG_HYUNDAI_LONG = 4 FLAG_TESLA_POWERTRAIN = 1 FLAG_TESLA_LONG_CONTROL = 2 def __init__(self, serial=None, claim=True): self._serial = serial self._handle = None self.connect(claim) self._mcu_type = self.get_mcu_type() def close(self): self._handle.close() self._handle = None def connect(self, claim=True, wait=False): if self._handle is not None: self.close() context = usb1.USBContext() self._handle = None while 1: try: for device in context.getDeviceList(skip_on_error=True): if device.getVendorID() == 0xbbaa and device.getProductID() in [0xddcc, 0xddee]: try: this_serial = device.getSerialNumber() except Exception: continue if self._serial is None or this_serial == self._serial: self._serial = this_serial print("opening device", self._serial, hex(device.getProductID())) self.bootstub = device.getProductID() == 0xddee self._handle = device.open() if sys.platform not in ["win32", "cygwin", "msys", "darwin"]: self._handle.setAutoDetachKernelDriver(True) if claim: self._handle.claimInterface(0) # self._handle.setInterfaceAltSetting(0, 0) # Issue in USB stack break except Exception as e: print("exception", e) traceback.print_exc() if not wait or self._handle is not None: break context = usb1.USBContext() # New context needed so new devices show up assert(self._handle is not None) self.health_version, self.can_version = self.get_packets_versions() print("connected") def reset(self, enter_bootstub=False, enter_bootloader=False): try: if enter_bootloader: self._handle.controlWrite(Panda.REQUEST_IN, 0xd1, 0, 0, b'') else: if enter_bootstub: self._handle.controlWrite(Panda.REQUEST_IN, 0xd1, 1, 0, b'') else: self._handle.controlWrite(Panda.REQUEST_IN, 0xd8, 0, 0, b'') except Exception: pass if not enter_bootloader: self.reconnect() def reconnect(self): self.close() time.sleep(1.0) success = False # wait up to 15 seconds for i in range(0, 15): try: self.connect() success = True break except Exception: print("reconnecting is taking %d seconds..." % (i + 1)) try: dfu = PandaDFU(PandaDFU.st_serial_to_dfu_serial(self._serial, self._mcu_type)) dfu.recover() except Exception: pass time.sleep(1.0) if not success: raise Exception("reconnect failed") @staticmethod def flash_static(handle, code): # confirm flasher is present fr = handle.controlRead(Panda.REQUEST_IN, 0xb0, 0, 0, 0xc) assert fr[4:8] == b"\xde\xad\xd0\x0d" # unlock flash print("flash: unlocking") handle.controlWrite(Panda.REQUEST_IN, 0xb1, 0, 0, b'') # erase sectors 1 through 3 print("flash: erasing") for i in range(1, 4): handle.controlWrite(Panda.REQUEST_IN, 0xb2, i, 0, b'') # flash over EP2 STEP = 0x10 print("flash: flashing") for i in range(0, len(code), STEP): handle.bulkWrite(2, code[i:i + STEP]) # reset print("flash: resetting") try: handle.controlWrite(Panda.REQUEST_IN, 0xd8, 0, 0, b'') except Exception: pass def flash(self, fn=DEFAULT_FW_FN, code=None, reconnect=True): if self._mcu_type == MCU_TYPE_H7 and fn == DEFAULT_FW_FN: fn = DEFAULT_H7_FW_FN print("flash: main version is " + self.get_version()) if not self.bootstub: self.reset(enter_bootstub=True) assert(self.bootstub) if code is None: with open(fn, "rb") as f: code = f.read() # get version print("flash: bootstub version is " + self.get_version()) # do flash Panda.flash_static(self._handle, code) # reconnect if reconnect: self.reconnect() def recover(self, timeout=None): self.reset(enter_bootstub=True) self.reset(enter_bootloader=True) t_start = time.time() while len(PandaDFU.list()) == 0: print("waiting for DFU...") time.sleep(0.1) if timeout is not None and (time.time() - t_start) > timeout: return False dfu = PandaDFU(PandaDFU.st_serial_to_dfu_serial(self._serial, self._mcu_type)) dfu.recover() # reflash after recover self.connect(True, True) self.flash() return True @staticmethod def list(): context = usb1.USBContext() ret = [] try: for device in context.getDeviceList(skip_on_error=True): if device.getVendorID() == 0xbbaa and device.getProductID() in [0xddcc, 0xddee]: try: ret.append(device.getSerialNumber()) except Exception: continue except Exception: pass return ret def call_control_api(self, msg): self._handle.controlWrite(Panda.REQUEST_OUT, msg, 0, 0, b'') # ******************* health ******************* @ensure_health_packet_version def health(self): dat = self._handle.controlRead(Panda.REQUEST_IN, 0xd2, 0, 0, 44) a = struct.unpack(" 0: print(f"kline recv: 0x{ret.hex()}") echo += ret return bytes(echo) def kline_send(self, x, bus=2, checksum=True): self.kline_drain(bus=bus) if checksum: x += bytes([sum(x) % 0x100]) for i in range(0, len(x), 0xf): ts = x[i:i + 0xf] if DEBUG: print(f"kline send: 0x{ts.hex()}") self._handle.bulkWrite(2, bytes([bus]) + ts) echo = self.kline_ll_recv(len(ts), bus=bus) if echo != ts: print(f"**** ECHO ERROR {i} ****") print(f"0x{echo.hex()}") print(f"0x{ts.hex()}") assert echo == ts def kline_recv(self, bus=2, header_len=4): # read header (last byte is length) msg = self.kline_ll_recv(header_len, bus=bus) # read data (add one byte to length for checksum) msg += self.kline_ll_recv(msg[-1]+1, bus=bus) return msg def send_heartbeat(self, engaged=True): self._handle.controlWrite(Panda.REQUEST_OUT, 0xf3, engaged, 0, b'') # disable heartbeat checks for use outside of openpilot # sending a heartbeat will reenable the checks def set_heartbeat_disabled(self): self._handle.controlWrite(Panda.REQUEST_OUT, 0xf8, 0, 0, b'') # ******************* RTC ******************* def set_datetime(self, dt): self._handle.controlWrite(Panda.REQUEST_OUT, 0xa1, int(dt.year), 0, b'') self._handle.controlWrite(Panda.REQUEST_OUT, 0xa2, int(dt.month), 0, b'') self._handle.controlWrite(Panda.REQUEST_OUT, 0xa3, int(dt.day), 0, b'') self._handle.controlWrite(Panda.REQUEST_OUT, 0xa4, int(dt.isoweekday()), 0, b'') self._handle.controlWrite(Panda.REQUEST_OUT, 0xa5, int(dt.hour), 0, b'') self._handle.controlWrite(Panda.REQUEST_OUT, 0xa6, int(dt.minute), 0, b'') self._handle.controlWrite(Panda.REQUEST_OUT, 0xa7, int(dt.second), 0, b'') def get_datetime(self): dat = self._handle.controlRead(Panda.REQUEST_IN, 0xa0, 0, 0, 8) a = struct.unpack("HBBBBBB", dat) return datetime.datetime(a[0], a[1], a[2], a[4], a[5], a[6]) # ******************* IR ******************* def set_ir_power(self, percentage): self._handle.controlWrite(Panda.REQUEST_OUT, 0xb0, int(percentage), 0, b'') # ******************* Fan ****************** def set_fan_power(self, percentage): self._handle.controlWrite(Panda.REQUEST_OUT, 0xb1, int(percentage), 0, b'') def get_fan_rpm(self): dat = self._handle.controlRead(Panda.REQUEST_IN, 0xb2, 0, 0, 2) a = struct.unpack("H", dat) return a[0] # ****************** Phone ***************** def set_phone_power(self, enabled): self._handle.controlWrite(Panda.REQUEST_OUT, 0xb3, int(enabled), 0, b'') # ************** Clock Source ************** def set_clock_source_mode(self, mode): self._handle.controlWrite(Panda.REQUEST_OUT, 0xf5, int(mode), 0, b'') # ****************** Siren ***************** def set_siren(self, enabled): self._handle.controlWrite(Panda.REQUEST_OUT, 0xf6, int(enabled), 0, b'') # ****************** Debug ***************** def set_green_led(self, enabled): self._handle.controlWrite(Panda.REQUEST_OUT, 0xf7, int(enabled), 0, b'')