# python library to interface with panda import datetime import struct import hashlib import socket import usb1 import os import time import traceback import sys from functools import wraps from .dfu import PandaDFU, MCU_TYPE_F2, MCU_TYPE_F4, MCU_TYPE_H7 # pylint: disable=import-error from .flash_release import flash_release # noqa pylint: disable=import-error from .update import ensure_st_up_to_date # noqa pylint: disable=import-error from .serial import PandaSerial # noqa pylint: disable=import-error from .isotp import isotp_send, isotp_recv # pylint: disable=import-error from .config import DEFAULT_FW_FN, DEFAULT_H7_FW_FN # noqa pylint: disable=import-error __version__ = '0.0.10' BASEDIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "../") DEBUG = os.getenv("PANDADEBUG") is not None CANPACKET_HEAD_SIZE = 0x5 DLC_TO_LEN = [0, 1, 2, 3, 4, 5, 6, 7, 8, 12, 16, 20, 24, 32, 48, 64] LEN_TO_DLC = {length: dlc for (dlc, length) in enumerate(DLC_TO_LEN)} def pack_can_buffer(arr): snds = [b''] idx = 0 for address, _, dat, bus in arr: assert len(dat) in LEN_TO_DLC if DEBUG: print(f" W 0x{address:x}: 0x{dat.hex()}") extended = 1 if address >= 0x800 else 0 data_len_code = LEN_TO_DLC[len(dat)] header = bytearray(5) word_4b = address << 3 | extended << 2 header[0] = (data_len_code << 4) | (bus << 1) header[1] = word_4b & 0xFF header[2] = (word_4b >> 8) & 0xFF header[3] = (word_4b >> 16) & 0xFF header[4] = (word_4b >> 24) & 0xFF snds[idx] += header + dat if len(snds[idx]) > 256: # Limit chunks to 256 bytes snds.append(b'') idx += 1 #Apply counter to each 64 byte packet for idx in range(len(snds)): tx = b'' counter = 0 for i in range (0, len(snds[idx]), 63): tx += bytes([counter]) + snds[idx][i:i+63] counter += 1 snds[idx] = tx return snds def unpack_can_buffer(dat): ret = [] counter = 0 tail = bytearray() for i in range(0, len(dat), 64): if counter != dat[i]: print("CAN: LOST RECV PACKET COUNTER") break counter+=1 chunk = tail + dat[i+1:i+64] tail = bytearray() pos = 0 while pos>4)] pckt_len = CANPACKET_HEAD_SIZE + data_len if pckt_len <= len(chunk[pos:]): header = chunk[pos:pos+CANPACKET_HEAD_SIZE] if len(header) < 5: print("CAN: MALFORMED USB RECV PACKET") break bus = (header[0] >> 1) & 0x7 address = (header[4] << 24 | header[3] << 16 | header[2] << 8 | header[1]) >> 3 returned = (header[1] >> 1) & 0x1 rejected = header[1] & 0x1 data = chunk[pos + CANPACKET_HEAD_SIZE:pos + CANPACKET_HEAD_SIZE + data_len] if returned: bus += 128 if rejected: bus += 192 if DEBUG: print(f" R 0x{address:x}: 0x{data.hex()}") ret.append((address, 0, data, bus)) pos += pckt_len else: tail = chunk[pos:] break return ret def ensure_health_packet_version(fn): @wraps(fn) def wrapper(self, *args, **kwargs): if self.health_version < self.HEALTH_PACKET_VERSION: raise RuntimeError("Panda firmware has outdated health packet definition. Reflash panda firmware.") elif self.health_version > self.HEALTH_PACKET_VERSION: raise RuntimeError("Panda python library has outdated health packet definition. Update panda python library.") return fn(self, *args, **kwargs) return wrapper def ensure_can_packet_version(fn): @wraps(fn) def wrapper(self, *args, **kwargs): if self.can_version < self.CAN_PACKET_VERSION: raise RuntimeError("Panda firmware has outdated CAN packet definition. Reflash panda firmware.") elif self.can_version > self.CAN_PACKET_VERSION: raise RuntimeError("Panda python library has outdated CAN packet definition. Update panda python library.") return fn(self, *args, **kwargs) return wrapper class PandaWifiStreaming(object): def __init__(self, ip="192.168.0.10", port=1338): self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sock.setblocking(0) self.ip = ip self.port = port self.kick() def kick(self): # must be called at least every 5 seconds self.sock.sendto("hello", (self.ip, self.port)) def can_recv(self): ret = [] while True: try: dat, addr = self.sock.recvfrom(0x200 * 0x10) if addr == (self.ip, self.port): ret += unpack_can_buffer(dat) except socket.error as e: if e.errno != 35 and e.errno != 11: traceback.print_exc() break return ret # stupid tunneling of USB over wifi and SPI class WifiHandle(object): def __init__(self, ip="192.168.0.10", port=1337): self.sock = socket.create_connection((ip, port)) def __recv(self): ret = self.sock.recv(0x44) length = struct.unpack("I", ret[0:4])[0] return ret[4:4 + length] def controlWrite(self, request_type, request, value, index, data, timeout=0): # ignore data in reply, panda doesn't use it return self.controlRead(request_type, request, value, index, 0, timeout) def controlRead(self, request_type, request, value, index, length, timeout=0): self.sock.send(struct.pack("HHBBHHH", 0, 0, request_type, request, value, index, length)) return self.__recv() def bulkWrite(self, endpoint, data, timeout=0): if len(data) > 0x10: raise ValueError("Data must not be longer than 0x10") self.sock.send(struct.pack("HH", endpoint, len(data)) + data) self.__recv() # to /dev/null def bulkRead(self, endpoint, length, timeout=0): self.sock.send(struct.pack("HH", endpoint, 0)) return self.__recv() def close(self): self.sock.close() # *** normal mode *** class Panda(object): # matches cereal.car.CarParams.SafetyModel SAFETY_SILENT = 0 SAFETY_HONDA_NIDEC = 1 SAFETY_TOYOTA = 2 SAFETY_ELM327 = 3 SAFETY_GM = 4 SAFETY_HONDA_BOSCH_GIRAFFE = 5 SAFETY_FORD = 6 SAFETY_HYUNDAI = 8 SAFETY_CHRYSLER = 9 SAFETY_TESLA = 10 SAFETY_SUBARU = 11 SAFETY_MAZDA = 13 SAFETY_NISSAN = 14 SAFETY_VOLKSWAGEN_MQB = 15 SAFETY_ALLOUTPUT = 17 SAFETY_GM_ASCM = 18 SAFETY_NOOUTPUT = 19 SAFETY_HONDA_BOSCH = 20 SAFETY_VOLKSWAGEN_PQ = 21 SAFETY_SUBARU_LEGACY = 22 SAFETY_HYUNDAI_LEGACY = 23 SERIAL_DEBUG = 0 SERIAL_ESP = 1 SERIAL_LIN1 = 2 SERIAL_LIN2 = 3 GMLAN_CAN2 = 1 GMLAN_CAN3 = 2 REQUEST_IN = usb1.ENDPOINT_IN | usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE REQUEST_OUT = usb1.ENDPOINT_OUT | usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE HW_TYPE_UNKNOWN = b'\x00' HW_TYPE_WHITE_PANDA = b'\x01' HW_TYPE_GREY_PANDA = b'\x02' HW_TYPE_BLACK_PANDA = b'\x03' HW_TYPE_PEDAL = b'\x04' HW_TYPE_UNO = b'\x05' HW_TYPE_DOS = b'\x06' HW_TYPE_RED_PANDA = b'\x07' CAN_PACKET_VERSION = 2 HEALTH_PACKET_VERSION = 1 F2_DEVICES = [HW_TYPE_PEDAL] F4_DEVICES = [HW_TYPE_WHITE_PANDA, HW_TYPE_GREY_PANDA, HW_TYPE_BLACK_PANDA, HW_TYPE_UNO, HW_TYPE_DOS] H7_DEVICES = [HW_TYPE_RED_PANDA] CLOCK_SOURCE_MODE_DISABLED = 0 CLOCK_SOURCE_MODE_FREE_RUNNING = 1 CLOCK_SOURCE_MODE_EXTERNAL_SYNC = 2 FLAG_HONDA_ALT_BRAKE = 1 FLAG_HONDA_BOSCH_LONG = 2 FLAG_HONDA_NIDEC_ALT = 4 FLAG_HYUNDAI_EV_GAS = 1 FLAG_HYUNDAI_HYBRID_GAS = 2 FLAG_HYUNDAI_LONG = 4 FLAG_TESLA_POWERTRAIN = 1 FLAG_TESLA_LONG_CONTROL = 2 def __init__(self, serial=None, claim=True): self._serial = serial self._handle = None self.connect(claim) self._mcu_type = self.get_mcu_type() def close(self): self._handle.close() self._handle = None def connect(self, claim=True, wait=False): if self._handle is not None: self.close() if self._serial == "WIFI": self._handle = WifiHandle() print("opening WIFI device") self.wifi = True else: context = usb1.USBContext() self._handle = None self.wifi = False while 1: try: for device in context.getDeviceList(skip_on_error=True): if device.getVendorID() == 0xbbaa and device.getProductID() in [0xddcc, 0xddee]: try: this_serial = device.getSerialNumber() except Exception: continue if self._serial is None or this_serial == self._serial: self._serial = this_serial print("opening device", self._serial, hex(device.getProductID())) self.bootstub = device.getProductID() == 0xddee self._handle = device.open() if sys.platform not in ["win32", "cygwin", "msys", "darwin"]: self._handle.setAutoDetachKernelDriver(True) if claim: self._handle.claimInterface(0) # self._handle.setInterfaceAltSetting(0, 0) # Issue in USB stack break except Exception as e: print("exception", e) traceback.print_exc() if not wait or self._handle is not None: break context = usb1.USBContext() # New context needed so new devices show up assert(self._handle is not None) self.health_version, self.can_version = self.get_packets_versions() print("connected") def reset(self, enter_bootstub=False, enter_bootloader=False): # reset try: if enter_bootloader: self._handle.controlWrite(Panda.REQUEST_IN, 0xd1, 0, 0, b'') else: if enter_bootstub: self._handle.controlWrite(Panda.REQUEST_IN, 0xd1, 1, 0, b'') else: self._handle.controlWrite(Panda.REQUEST_IN, 0xd8, 0, 0, b'') except Exception: pass if not enter_bootloader: self.reconnect() def reconnect(self): self.close() time.sleep(1.0) success = False # wait up to 15 seconds for i in range(0, 15): try: self.connect() success = True break except Exception: print("reconnecting is taking %d seconds..." % (i + 1)) try: dfu = PandaDFU(PandaDFU.st_serial_to_dfu_serial(self._serial, self._mcu_type)) dfu.recover() except Exception: pass time.sleep(1.0) if not success: raise Exception("reconnect failed") @staticmethod def flash_static(handle, code): # confirm flasher is present fr = handle.controlRead(Panda.REQUEST_IN, 0xb0, 0, 0, 0xc) assert fr[4:8] == b"\xde\xad\xd0\x0d" # unlock flash print("flash: unlocking") handle.controlWrite(Panda.REQUEST_IN, 0xb1, 0, 0, b'') # erase sectors 1 through 3 print("flash: erasing") for i in range(1, 4): handle.controlWrite(Panda.REQUEST_IN, 0xb2, i, 0, b'') # flash over EP2 STEP = 0x10 print("flash: flashing") for i in range(0, len(code), STEP): handle.bulkWrite(2, code[i:i + STEP]) # reset print("flash: resetting") try: handle.controlWrite(Panda.REQUEST_IN, 0xd8, 0, 0, b'') except Exception: pass def flash(self, fn=DEFAULT_FW_FN, code=None, reconnect=True): if self._mcu_type == MCU_TYPE_H7 and fn == DEFAULT_FW_FN: fn = DEFAULT_H7_FW_FN print("flash: main version is " + self.get_version()) if not self.bootstub: self.reset(enter_bootstub=True) assert(self.bootstub) if code is None: with open(fn, "rb") as f: code = f.read() # get version print("flash: bootstub version is " + self.get_version()) # do flash Panda.flash_static(self._handle, code) # reconnect if reconnect: self.reconnect() def recover(self, timeout=None): self.reset(enter_bootstub=True) self.reset(enter_bootloader=True) t_start = time.time() while len(PandaDFU.list()) == 0: print("waiting for DFU...") time.sleep(0.1) if timeout is not None and (time.time() - t_start) > timeout: return False dfu = PandaDFU(PandaDFU.st_serial_to_dfu_serial(self._serial, self._mcu_type)) dfu.recover() # reflash after recover self.connect(True, True) self.flash() return True @staticmethod def flash_ota_st(): ret = os.system("cd %s && make clean && make ota" % (os.path.join(BASEDIR, "board"))) time.sleep(1) return ret == 0 @staticmethod def flash_ota_wifi(release=False): release_str = "RELEASE=1" if release else "" ret = os.system("cd {} && make clean && {} make ota".format(os.path.join(BASEDIR, "boardesp"), release_str)) time.sleep(1) return ret == 0 @staticmethod def list(): context = usb1.USBContext() ret = [] try: for device in context.getDeviceList(skip_on_error=True): if device.getVendorID() == 0xbbaa and device.getProductID() in [0xddcc, 0xddee]: try: ret.append(device.getSerialNumber()) except Exception: continue except Exception: pass # TODO: detect if this is real # ret += ["WIFI"] return ret def call_control_api(self, msg): self._handle.controlWrite(Panda.REQUEST_OUT, msg, 0, 0, b'') # ******************* health ******************* @ensure_health_packet_version def health(self): dat = self._handle.controlRead(Panda.REQUEST_IN, 0xd2, 0, 0, 44) a = struct.unpack(" 0: print(f"kline recv: 0x{ret.hex()}") echo += ret return bytes(echo) def kline_send(self, x, bus=2, checksum=True): self.kline_drain(bus=bus) if checksum: x += bytes([sum(x) % 0x100]) for i in range(0, len(x), 0xf): ts = x[i:i + 0xf] if DEBUG: print(f"kline send: 0x{ts.hex()}") self._handle.bulkWrite(2, bytes([bus]) + ts) echo = self.kline_ll_recv(len(ts), bus=bus) if echo != ts: print(f"**** ECHO ERROR {i} ****") print(f"0x{echo.hex()}") print(f"0x{ts.hex()}") assert echo == ts def kline_recv(self, bus=2, header_len=4): # read header (last byte is length) msg = self.kline_ll_recv(header_len, bus=bus) # read data (add one byte to length for checksum) msg += self.kline_ll_recv(msg[-1]+1, bus=bus) return msg def send_heartbeat(self): self._handle.controlWrite(Panda.REQUEST_OUT, 0xf3, 0, 0, b'') # disable heartbeat checks for use outside of openpilot # sending a heartbeat will reenable the checks def set_heartbeat_disabled(self): self._handle.controlWrite(Panda.REQUEST_OUT, 0xf8, 0, 0, b'') # ******************* RTC ******************* def set_datetime(self, dt): self._handle.controlWrite(Panda.REQUEST_OUT, 0xa1, int(dt.year), 0, b'') self._handle.controlWrite(Panda.REQUEST_OUT, 0xa2, int(dt.month), 0, b'') self._handle.controlWrite(Panda.REQUEST_OUT, 0xa3, int(dt.day), 0, b'') self._handle.controlWrite(Panda.REQUEST_OUT, 0xa4, int(dt.isoweekday()), 0, b'') self._handle.controlWrite(Panda.REQUEST_OUT, 0xa5, int(dt.hour), 0, b'') self._handle.controlWrite(Panda.REQUEST_OUT, 0xa6, int(dt.minute), 0, b'') self._handle.controlWrite(Panda.REQUEST_OUT, 0xa7, int(dt.second), 0, b'') def get_datetime(self): dat = self._handle.controlRead(Panda.REQUEST_IN, 0xa0, 0, 0, 8) a = struct.unpack("HBBBBBB", dat) return datetime.datetime(a[0], a[1], a[2], a[4], a[5], a[6]) # ******************* IR ******************* def set_ir_power(self, percentage): self._handle.controlWrite(Panda.REQUEST_OUT, 0xb0, int(percentage), 0, b'') # ******************* Fan ****************** def set_fan_power(self, percentage): self._handle.controlWrite(Panda.REQUEST_OUT, 0xb1, int(percentage), 0, b'') def get_fan_rpm(self): dat = self._handle.controlRead(Panda.REQUEST_IN, 0xb2, 0, 0, 2) a = struct.unpack("H", dat) return a[0] # ****************** Phone ***************** def set_phone_power(self, enabled): self._handle.controlWrite(Panda.REQUEST_OUT, 0xb3, int(enabled), 0, b'') # ************** Clock Source ************** def set_clock_source_mode(self, mode): self._handle.controlWrite(Panda.REQUEST_OUT, 0xf5, int(mode), 0, b'') # ****************** Siren ***************** def set_siren(self, enabled): self._handle.controlWrite(Panda.REQUEST_OUT, 0xf6, int(enabled), 0, b'') # ****************** Debug ***************** def set_green_led(self, enabled): self._handle.controlWrite(Panda.REQUEST_OUT, 0xf7, int(enabled), 0, b'')