Remove old panda subtree

This commit is contained in:
Vehicle Researcher
2020-01-15 14:04:40 -08:00
parent 0440535f64
commit ab415aa5fa
370 changed files with 0 additions and 69920 deletions

View File

@@ -1,655 +0,0 @@
# python library to interface with panda
import datetime
import binascii
import struct
import hashlib
import socket
import usb1
import os
import time
import traceback
import subprocess
from .dfu import PandaDFU
from .esptool import ESPROM, CesantaFlasher # noqa: F401
from .flash_release import flash_release # noqa: F401
from .update import ensure_st_up_to_date # noqa: F401
from .serial import PandaSerial # noqa: F401
from .isotp import isotp_send, isotp_recv
__version__ = '0.0.9'
BASEDIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "../")
DEBUG = os.getenv("PANDADEBUG") is not None
# *** wifi mode ***
def build_st(target, mkfile="Makefile"):
from panda import BASEDIR
cmd = 'cd %s && make -f %s clean && make -f %s %s >/dev/null' % (os.path.join(BASEDIR, "board"), mkfile, mkfile, target)
try:
_ = subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True)
except subprocess.CalledProcessError:
#output = exception.output
#returncode = exception.returncode
raise
def parse_can_buffer(dat):
ret = []
for j in range(0, len(dat), 0x10):
ddat = dat[j:j+0x10]
f1, f2 = struct.unpack("II", ddat[0:8])
extended = 4
if f1 & extended:
address = f1 >> 3
else:
address = f1 >> 21
dddat = ddat[8:8+(f2&0xF)]
if DEBUG:
print(" R %x: %s" % (address, binascii.hexlify(dddat)))
ret.append((address, f2>>16, dddat, (f2>>4)&0xFF))
return ret
class PandaWifiStreaming(object):
def __init__(self, ip="192.168.0.10", port=1338):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.setblocking(0)
self.ip = ip
self.port = port
self.kick()
def kick(self):
# must be called at least every 5 seconds
self.sock.sendto("hello", (self.ip, self.port))
def can_recv(self):
ret = []
while True:
try:
dat, addr = self.sock.recvfrom(0x200*0x10)
if addr == (self.ip, self.port):
ret += parse_can_buffer(dat)
except socket.error as e:
if e.errno != 35 and e.errno != 11:
traceback.print_exc()
break
return ret
# stupid tunneling of USB over wifi and SPI
class WifiHandle(object):
def __init__(self, ip="192.168.0.10", port=1337):
self.sock = socket.create_connection((ip, port))
def __recv(self):
ret = self.sock.recv(0x44)
length = struct.unpack("I", ret[0:4])[0]
return ret[4:4+length]
def controlWrite(self, request_type, request, value, index, data, timeout=0):
# ignore data in reply, panda doesn't use it
return self.controlRead(request_type, request, value, index, 0, timeout)
def controlRead(self, request_type, request, value, index, length, timeout=0):
self.sock.send(struct.pack("HHBBHHH", 0, 0, request_type, request, value, index, length))
return self.__recv()
def bulkWrite(self, endpoint, data, timeout=0):
if len(data) > 0x10:
raise ValueError("Data must not be longer than 0x10")
self.sock.send(struct.pack("HH", endpoint, len(data))+data)
self.__recv() # to /dev/null
def bulkRead(self, endpoint, length, timeout=0):
self.sock.send(struct.pack("HH", endpoint, 0))
return self.__recv()
def close(self):
self.sock.close()
# *** normal mode ***
class Panda(object):
# matches cereal.car.CarParams.SafetyModel
SAFETY_SILENT = 0
SAFETY_HONDA = 1
SAFETY_TOYOTA = 2
SAFETY_ELM327 = 3
SAFETY_GM = 4
SAFETY_HONDA_BOSCH = 5
SAFETY_FORD = 6
SAFETY_CADILLAC = 7
SAFETY_HYUNDAI = 8
SAFETY_CHRYSLER = 9
SAFETY_TESLA = 10
SAFETY_SUBARU = 11
SAFETY_MAZDA = 13
SAFETY_VOLKSWAGEN = 15
SAFETY_TOYOTA_IPAS = 16
SAFETY_ALLOUTPUT = 17
SAFETY_GM_ASCM = 18
SAFETY_NOOUTPUT = 19
SERIAL_DEBUG = 0
SERIAL_ESP = 1
SERIAL_LIN1 = 2
SERIAL_LIN2 = 3
GMLAN_CAN2 = 1
GMLAN_CAN3 = 2
REQUEST_IN = usb1.ENDPOINT_IN | usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE
REQUEST_OUT = usb1.ENDPOINT_OUT | usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE
HW_TYPE_UNKNOWN = b'\x00'
HW_TYPE_WHITE_PANDA = b'\x01'
HW_TYPE_GREY_PANDA = b'\x02'
HW_TYPE_BLACK_PANDA = b'\x03'
HW_TYPE_PEDAL = b'\x04'
HW_TYPE_UNO = b'\x05'
def __init__(self, serial=None, claim=True):
self._serial = serial
self._handle = None
self.connect(claim)
def close(self):
self._handle.close()
self._handle = None
def connect(self, claim=True, wait=False):
if self._handle != None:
self.close()
if self._serial == "WIFI":
self._handle = WifiHandle()
print("opening WIFI device")
self.wifi = True
else:
context = usb1.USBContext()
self._handle = None
self.wifi = False
while 1:
try:
for device in context.getDeviceList(skip_on_error=True):
#print(device)
if device.getVendorID() == 0xbbaa and device.getProductID() in [0xddcc, 0xddee]:
try:
this_serial = device.getSerialNumber()
except Exception:
continue
if self._serial is None or this_serial == self._serial:
self._serial = this_serial
print("opening device", self._serial, hex(device.getProductID()))
time.sleep(1)
self.bootstub = device.getProductID() == 0xddee
self.legacy = (device.getbcdDevice() != 0x2300)
self._handle = device.open()
if claim:
self._handle.claimInterface(0)
#self._handle.setInterfaceAltSetting(0, 0) #Issue in USB stack
break
except Exception as e:
print("exception", e)
traceback.print_exc()
if wait == False or self._handle != None:
break
context = usb1.USBContext() #New context needed so new devices show up
assert(self._handle != None)
print("connected")
def reset(self, enter_bootstub=False, enter_bootloader=False):
# reset
try:
if enter_bootloader:
self._handle.controlWrite(Panda.REQUEST_IN, 0xd1, 0, 0, b'')
else:
if enter_bootstub:
self._handle.controlWrite(Panda.REQUEST_IN, 0xd1, 1, 0, b'')
else:
self._handle.controlWrite(Panda.REQUEST_IN, 0xd8, 0, 0, b'')
except Exception:
pass
if not enter_bootloader:
self.reconnect()
def reconnect(self):
self.close()
time.sleep(1.0)
success = False
# wait up to 15 seconds
for i in range(0, 15):
try:
self.connect()
success = True
break
except Exception:
print("reconnecting is taking %d seconds..." % (i+1))
try:
dfu = PandaDFU(PandaDFU.st_serial_to_dfu_serial(self._serial))
dfu.recover()
except Exception:
pass
time.sleep(1.0)
if not success:
raise Exception("reconnect failed")
@staticmethod
def flash_static(handle, code):
# confirm flasher is present
fr = handle.controlRead(Panda.REQUEST_IN, 0xb0, 0, 0, 0xc)
assert fr[4:8] == b"\xde\xad\xd0\x0d"
# unlock flash
print("flash: unlocking")
handle.controlWrite(Panda.REQUEST_IN, 0xb1, 0, 0, b'')
# erase sectors 1 through 3
print("flash: erasing")
for i in range(1, 4):
handle.controlWrite(Panda.REQUEST_IN, 0xb2, i, 0, b'')
# flash over EP2
STEP = 0x10
print("flash: flashing")
for i in range(0, len(code), STEP):
handle.bulkWrite(2, code[i:i+STEP])
# reset
print("flash: resetting")
try:
handle.controlWrite(Panda.REQUEST_IN, 0xd8, 0, 0, b'')
except Exception:
pass
def flash(self, fn=None, code=None, reconnect=True):
print("flash: main version is " + self.get_version())
if not self.bootstub:
self.reset(enter_bootstub=True)
assert(self.bootstub)
if fn is None and code is None:
if self.legacy:
fn = "obj/comma.bin"
print("building legacy st code")
build_st(fn, "Makefile.legacy")
else:
fn = "obj/panda.bin"
print("building panda st code")
build_st(fn)
fn = os.path.join(BASEDIR, "board", fn)
if code is None:
with open(fn, "rb") as f:
code = f.read()
# get version
print("flash: bootstub version is " + self.get_version())
# do flash
Panda.flash_static(self._handle, code)
# reconnect
if reconnect:
self.reconnect()
def recover(self, timeout=None):
self.reset(enter_bootloader=True)
t_start = time.time()
while len(PandaDFU.list()) == 0:
print("waiting for DFU...")
time.sleep(0.1)
if timeout is not None and (time.time() - t_start) > timeout:
return False
dfu = PandaDFU(PandaDFU.st_serial_to_dfu_serial(self._serial))
dfu.recover()
# reflash after recover
self.connect(True, True)
self.flash()
return True
@staticmethod
def flash_ota_st():
ret = os.system("cd %s && make clean && make ota" % (os.path.join(BASEDIR, "board")))
time.sleep(1)
return ret==0
@staticmethod
def flash_ota_wifi(release=False):
release_str = "RELEASE=1" if release else ""
ret = os.system("cd {} && make clean && {} make ota".format(os.path.join(BASEDIR, "boardesp"),release_str))
time.sleep(1)
return ret==0
@staticmethod
def list():
context = usb1.USBContext()
ret = []
try:
for device in context.getDeviceList(skip_on_error=True):
if device.getVendorID() == 0xbbaa and device.getProductID() in [0xddcc, 0xddee]:
try:
ret.append(device.getSerialNumber())
except Exception:
continue
except Exception:
pass
# TODO: detect if this is real
#ret += ["WIFI"]
return ret
def call_control_api(self, msg):
self._handle.controlWrite(Panda.REQUEST_OUT, msg, 0, 0, b'')
# ******************* health *******************
def health(self):
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xd2, 0, 0, 37)
a = struct.unpack("IIIIIIIBBBBBBBBB", dat)
return {
"uptime": a[0],
"voltage": a[1],
"current": a[2],
"can_send_errs": a[3],
"can_fwd_errs": a[4],
"gmlan_send_errs": a[5],
"faults": a[6],
"ignition_line": a[7],
"ignition_can": a[8],
"controls_allowed": a[9],
"gas_interceptor_detected": a[10],
"car_harness_status": a[11],
"usb_power_mode": a[12],
"safety_mode": a[13],
"fault_status": a[14],
"power_save_enabled": a[15]
}
# ******************* control *******************
def enter_bootloader(self):
try:
self._handle.controlWrite(Panda.REQUEST_OUT, 0xd1, 0, 0, b'')
except Exception as e:
print(e)
pass
def get_version(self):
return self._handle.controlRead(Panda.REQUEST_IN, 0xd6, 0, 0, 0x40).decode('utf8')
@staticmethod
def get_signature_from_firmware(fn):
f = open(fn, 'rb')
f.seek(-128, 2) # Seek from end of file
return f.read(128)
def get_signature(self):
part_1 = self._handle.controlRead(Panda.REQUEST_IN, 0xd3, 0, 0, 0x40)
part_2 = self._handle.controlRead(Panda.REQUEST_IN, 0xd4, 0, 0, 0x40)
return part_1 + part_2
def get_type(self):
return self._handle.controlRead(Panda.REQUEST_IN, 0xc1, 0, 0, 0x40)
def is_white(self):
return self.get_type() == Panda.HW_TYPE_WHITE_PANDA
def is_grey(self):
return self.get_type() == Panda.HW_TYPE_GREY_PANDA
def is_black(self):
return self.get_type() == Panda.HW_TYPE_BLACK_PANDA
def is_uno(self):
return self.get_type() == Panda.HW_TYPE_UNO
def has_obd(self):
return (self.is_uno() or self.is_black())
def get_serial(self):
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xd0, 0, 0, 0x20)
hashsig, calc_hash = dat[0x1c:], hashlib.sha1(dat[0:0x1c]).digest()[0:4]
assert(hashsig == calc_hash)
return [dat[0:0x10].decode("utf8"), dat[0x10:0x10+10].decode("utf8")]
def get_secret(self):
return self._handle.controlRead(Panda.REQUEST_IN, 0xd0, 1, 0, 0x10)
# ******************* configuration *******************
def set_usb_power(self, on):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe6, int(on), 0, b'')
def set_power_save(self, power_save_enabled=0):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe7, int(power_save_enabled), 0, b'')
def set_esp_power(self, on):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xd9, int(on), 0, b'')
def esp_reset(self, bootmode=0):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xda, int(bootmode), 0, b'')
time.sleep(0.2)
def set_safety_mode(self, mode=SAFETY_SILENT):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xdc, mode, 0, b'')
def set_can_forwarding(self, from_bus, to_bus):
# TODO: This feature may not work correctly with saturated buses
self._handle.controlWrite(Panda.REQUEST_OUT, 0xdd, from_bus, to_bus, b'')
def set_gmlan(self, bus=2):
# TODO: check panda type
if bus is None:
self._handle.controlWrite(Panda.REQUEST_OUT, 0xdb, 0, 0, b'')
elif bus in [Panda.GMLAN_CAN2, Panda.GMLAN_CAN3]:
self._handle.controlWrite(Panda.REQUEST_OUT, 0xdb, 1, bus, b'')
def set_obd(self, obd):
# TODO: check panda type
self._handle.controlWrite(Panda.REQUEST_OUT, 0xdb, int(obd), 0, b'')
def set_can_loopback(self, enable):
# set can loopback mode for all buses
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe5, int(enable), 0, b'')
def set_can_enable(self, bus_num, enable):
# sets the can transciever enable pin
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf4, int(bus_num), int(enable), b'')
def set_can_speed_kbps(self, bus, speed):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xde, bus, int(speed*10), b'')
def set_uart_baud(self, uart, rate):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe4, uart, int(rate/300), b'')
def set_uart_parity(self, uart, parity):
# parity, 0=off, 1=even, 2=odd
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe2, uart, parity, b'')
def set_uart_callback(self, uart, install):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe3, uart, int(install), b'')
# ******************* can *******************
def can_send_many(self, arr):
snds = []
transmit = 1
extended = 4
for addr, _, dat, bus in arr:
assert len(dat) <= 8
if DEBUG:
print(" W %x: %s" % (addr, binascii.hexlify(dat)))
if addr >= 0x800:
rir = (addr << 3) | transmit | extended
else:
rir = (addr << 21) | transmit
snd = struct.pack("II", rir, len(dat) | (bus << 4)) + dat
snd = snd.ljust(0x10, b'\x00')
snds.append(snd)
while True:
try:
#print("DAT: %s"%b''.join(snds).__repr__())
if self.wifi:
for s in snds:
self._handle.bulkWrite(3, s)
else:
self._handle.bulkWrite(3, b''.join(snds))
break
except (usb1.USBErrorIO, usb1.USBErrorOverflow):
print("CAN: BAD SEND MANY, RETRYING")
def can_send(self, addr, dat, bus):
self.can_send_many([[addr, None, dat, bus]])
def can_recv(self):
dat = bytearray()
while True:
try:
dat = self._handle.bulkRead(1, 0x10*256)
break
except (usb1.USBErrorIO, usb1.USBErrorOverflow):
print("CAN: BAD RECV, RETRYING")
time.sleep(0.1)
return parse_can_buffer(dat)
def can_clear(self, bus):
"""Clears all messages from the specified internal CAN ringbuffer as
though it were drained.
Args:
bus (int): can bus number to clear a tx queue, or 0xFFFF to clear the
global can rx queue.
"""
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf1, bus, 0, b'')
# ******************* isotp *******************
def isotp_send(self, addr, dat, bus, recvaddr=None, subaddr=None):
return isotp_send(self, dat, addr, bus, recvaddr, subaddr)
def isotp_recv(self, addr, bus=0, sendaddr=None, subaddr=None):
return isotp_recv(self, addr, bus, sendaddr, subaddr)
# ******************* serial *******************
def serial_read(self, port_number):
ret = []
while 1:
lret = bytes(self._handle.controlRead(Panda.REQUEST_IN, 0xe0, port_number, 0, 0x40))
if len(lret) == 0:
break
ret.append(lret)
return b''.join(ret)
def serial_write(self, port_number, ln):
ret = 0
for i in range(0, len(ln), 0x20):
ret += self._handle.bulkWrite(2, struct.pack("B", port_number) + ln[i:i+0x20])
return ret
def serial_clear(self, port_number):
"""Clears all messages (tx and rx) from the specified internal uart
ringbuffer as though it were drained.
Args:
port_number (int): port number of the uart to clear.
"""
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf2, port_number, 0, b'')
# ******************* kline *******************
# pulse low for wakeup
def kline_wakeup(self):
if DEBUG:
print("kline wakeup...")
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf0, 0, 0, b'')
if DEBUG:
print("kline wakeup done")
def kline_drain(self, bus=2):
# drain buffer
bret = bytearray()
while True:
ret = self._handle.controlRead(Panda.REQUEST_IN, 0xe0, bus, 0, 0x40)
if len(ret) == 0:
break
elif DEBUG:
print("kline drain: " + binascii.hexlify(ret))
bret += ret
return bytes(bret)
def kline_ll_recv(self, cnt, bus=2):
echo = bytearray()
while len(echo) != cnt:
ret = self._handle.controlRead(Panda.REQUEST_OUT, 0xe0, bus, 0, cnt-len(echo))
if DEBUG and len(ret) > 0:
print("kline recv: " + binascii.hexlify(ret))
echo += ret
return str(echo)
def kline_send(self, x, bus=2, checksum=True):
def get_checksum(dat):
result = 0
result += sum(map(ord, dat)) if isinstance(b'dat', str) else sum(dat)
result = -result
return struct.pack("B", result % 0x100)
self.kline_drain(bus=bus)
if checksum:
x += get_checksum(x)
for i in range(0, len(x), 0xf):
ts = x[i:i+0xf]
if DEBUG:
print("kline send: " + binascii.hexlify(ts))
self._handle.bulkWrite(2, bytes([bus]) + ts)
echo = self.kline_ll_recv(len(ts), bus=bus)
if echo != ts:
print("**** ECHO ERROR %d ****" % i)
print(binascii.hexlify(echo))
print(binascii.hexlify(ts))
assert echo == ts
def kline_recv(self, bus=2):
msg = self.kline_ll_recv(2, bus=bus)
msg += self.kline_ll_recv(ord(msg[1])-2, bus=bus)
return msg
def send_heartbeat(self):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf3, 0, 0, b'')
# ******************* RTC *******************
def set_datetime(self, dt):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xa1, int(dt.year), 0, b'')
self._handle.controlWrite(Panda.REQUEST_OUT, 0xa2, int(dt.month), 0, b'')
self._handle.controlWrite(Panda.REQUEST_OUT, 0xa3, int(dt.day), 0, b'')
self._handle.controlWrite(Panda.REQUEST_OUT, 0xa4, int(dt.isoweekday()), 0, b'')
self._handle.controlWrite(Panda.REQUEST_OUT, 0xa5, int(dt.hour), 0, b'')
self._handle.controlWrite(Panda.REQUEST_OUT, 0xa6, int(dt.minute), 0, b'')
self._handle.controlWrite(Panda.REQUEST_OUT, 0xa7, int(dt.second), 0, b'')
def get_datetime(self):
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xa0, 0, 0, 8)
a = struct.unpack("HBBBBBB", dat)
return datetime.datetime(a[0], a[1], a[2], a[4], a[5], a[6])
# ******************* IR *******************
def set_ir_power(self, percentage):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xb0, int(percentage), 0, b'')
# ******************* Fan ******************
def set_fan_power(self, percentage):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xb1, int(percentage), 0, b'')
def get_fan_rpm(self):
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xb2, 0, 0, 2)
a = struct.unpack("H", dat)
return a[0]
# ****************** Phone *****************
def set_phone_power(self, enabled):
self._handle.controlWrite(Panda.REQUEST_OUT, 0xb3, int(enabled), 0, b'')

View File

@@ -1,119 +0,0 @@
import os
import usb1
import struct
import binascii
# *** DFU mode ***
DFU_DNLOAD = 1
DFU_UPLOAD = 2
DFU_GETSTATUS = 3
DFU_CLRSTATUS = 4
DFU_ABORT = 6
class PandaDFU(object):
def __init__(self, dfu_serial):
context = usb1.USBContext()
for device in context.getDeviceList(skip_on_error=True):
if device.getVendorID() == 0x0483 and device.getProductID() == 0xdf11:
try:
this_dfu_serial = device.open().getASCIIStringDescriptor(3)
except Exception:
continue
if this_dfu_serial == dfu_serial or dfu_serial is None:
self._handle = device.open()
self.legacy = "07*128Kg" in self._handle.getASCIIStringDescriptor(4)
return
raise Exception("failed to open "+dfu_serial)
@staticmethod
def list():
context = usb1.USBContext()
dfu_serials = []
try:
for device in context.getDeviceList(skip_on_error=True):
if device.getVendorID() == 0x0483 and device.getProductID() == 0xdf11:
try:
dfu_serials.append(device.open().getASCIIStringDescriptor(3))
except Exception:
pass
except Exception:
pass
return dfu_serials
@staticmethod
def st_serial_to_dfu_serial(st):
if st == None or st == "none":
return None
uid_base = struct.unpack("H"*6, bytes.fromhex(st))
return binascii.hexlify(struct.pack("!HHH", uid_base[1] + uid_base[5], uid_base[0] + uid_base[4] + 0xA, uid_base[3])).upper().decode("utf-8")
def status(self):
while 1:
dat = self._handle.controlRead(0x21, DFU_GETSTATUS, 0, 0, 6)
if dat[1] == 0:
break
def clear_status(self):
# Clear status
stat = self._handle.controlRead(0x21, DFU_GETSTATUS, 0, 0, 6)
if stat[4] == 0xa:
self._handle.controlRead(0x21, DFU_CLRSTATUS, 0, 0, 0)
elif stat[4] == 0x9:
self._handle.controlWrite(0x21, DFU_ABORT, 0, 0, b"")
self.status()
stat = str(self._handle.controlRead(0x21, DFU_GETSTATUS, 0, 0, 6))
def erase(self, address):
self._handle.controlWrite(0x21, DFU_DNLOAD, 0, 0, b"\x41" + struct.pack("I", address))
self.status()
def program(self, address, dat, block_size=None):
if block_size == None:
block_size = len(dat)
# Set Address Pointer
self._handle.controlWrite(0x21, DFU_DNLOAD, 0, 0, b"\x21" + struct.pack("I", address))
self.status()
# Program
dat += b"\xFF"*((block_size-len(dat)) % block_size)
for i in range(0, len(dat)//block_size):
ldat = dat[i*block_size:(i+1)*block_size]
print("programming %d with length %d" % (i, len(ldat)))
self._handle.controlWrite(0x21, DFU_DNLOAD, 2+i, 0, ldat)
self.status()
def program_bootstub(self, code_bootstub):
self.clear_status()
self.erase(0x8004000)
self.erase(0x8000000)
self.program(0x8000000, code_bootstub, 0x800)
self.reset()
def recover(self):
from panda import BASEDIR, build_st
if self.legacy:
fn = "obj/bootstub.comma.bin"
print("building legacy bootstub")
build_st(fn, "Makefile.legacy")
else:
fn = "obj/bootstub.panda.bin"
print("building panda bootstub")
build_st(fn)
fn = os.path.join(BASEDIR, "board", fn)
with open(fn, "rb") as f:
code = f.read()
self.program_bootstub(code)
def reset(self):
# **** Reset ****
self._handle.controlWrite(0x21, DFU_DNLOAD, 0, 0, b"\x21" + struct.pack("I", 0x8000000))
self.status()
try:
self._handle.controlWrite(0x21, DFU_DNLOAD, 2, 0, b"")
_ = str(self._handle.controlRead(0x21, DFU_GETSTATUS, 0, 0, 6))
except Exception:
pass

File diff suppressed because it is too large Load Diff

View File

@@ -1,95 +0,0 @@
#!/usr/bin/env python3
import sys
import time
import requests
import json
import io
def flash_release(path=None, st_serial=None):
from panda import Panda, PandaDFU, ESPROM, CesantaFlasher
from zipfile import ZipFile
def status(x):
print("\033[1;32;40m"+x+"\033[00m")
if st_serial == None:
# look for Panda
panda_list = Panda.list()
if len(panda_list) == 0:
raise Exception("panda not found, make sure it's connected and your user can access it")
elif len(panda_list) > 1:
raise Exception("Please only connect one panda")
st_serial = panda_list[0]
print("Using panda with serial %s" % st_serial)
if path == None:
print("Fetching latest firmware from github.com/commaai/panda-artifacts")
r = requests.get("https://raw.githubusercontent.com/commaai/panda-artifacts/master/latest.json")
url = json.loads(r.text)['url']
r = requests.get(url)
print("Fetching firmware from %s" % url)
path = io.StringIO(r.content)
zf = ZipFile(path)
zf.printdir()
version = zf.read("version")
status("0. Preparing to flash "+version)
code_bootstub = zf.read("bootstub.panda.bin")
code_panda = zf.read("panda.bin")
code_boot_15 = zf.read("boot_v1.5.bin")
code_boot_15 = code_boot_15[0:2] + "\x00\x30" + code_boot_15[4:]
code_user1 = zf.read("user1.bin")
code_user2 = zf.read("user2.bin")
# enter DFU mode
status("1. Entering DFU mode")
panda = Panda(st_serial)
panda.enter_bootloader()
time.sleep(1)
# program bootstub
status("2. Programming bootstub")
dfu = PandaDFU(PandaDFU.st_serial_to_dfu_serial(st_serial))
dfu.program_bootstub(code_bootstub)
time.sleep(1)
# flash main code
status("3. Flashing main code")
panda = Panda(st_serial)
panda.flash(code=code_panda)
panda.close()
# flashing ESP
status("4. Flashing ESP (slow!)")
align = lambda x, sz=0x1000: x+"\xFF"*((sz-len(x)) % sz)
esp = ESPROM(st_serial)
esp.connect()
flasher = CesantaFlasher(esp, 230400)
flasher.flash_write(0x0, align(code_boot_15), True)
flasher.flash_write(0x1000, align(code_user1), True)
flasher.flash_write(0x81000, align(code_user2), True)
flasher.flash_write(0x3FE000, "\xFF"*0x1000)
flasher.boot_fw()
del flasher
del esp
time.sleep(1)
# check for connection
status("5. Verifying version")
panda = Panda(st_serial)
my_version = panda.get_version()
print("dongle id: %s" % panda.get_serial()[0])
print(my_version, "should be", version)
assert(str(version) == str(my_version))
# done!
status("6. Success!")
if __name__ == "__main__":
flash_release(*sys.argv[1:])

View File

@@ -1,137 +0,0 @@
import binascii
DEBUG = False
def msg(x):
if DEBUG:
print("S:", binascii.hexlify(x))
if len(x) <= 7:
ret = bytes([len(x)]) + x
else:
assert False
return ret.ljust(8, b"\x00")
kmsgs = []
def recv(panda, cnt, addr, nbus):
global kmsgs
ret = []
while len(ret) < cnt:
kmsgs += panda.can_recv()
nmsgs = []
for ids, ts, dat, bus in kmsgs:
if ids == addr and bus == nbus and len(ret) < cnt:
ret.append(dat)
else:
# leave around
nmsgs.append((ids, ts, dat, bus))
kmsgs = nmsgs[-256:]
return ret
def isotp_recv_subaddr(panda, addr, bus, sendaddr, subaddr):
msg = recv(panda, 1, addr, bus)[0]
# TODO: handle other subaddr also communicating
assert msg[0] == subaddr
if msg[1]&0xf0 == 0x10:
# first
tlen = ((msg[1] & 0xf) << 8) | msg[2]
dat = msg[3:]
# 0 block size?
CONTINUE = bytes([subaddr]) + b"\x30" + b"\x00"*6
panda.can_send(sendaddr, CONTINUE, bus)
idx = 1
for mm in recv(panda, (tlen-len(dat) + 5)//6, addr, bus):
assert mm[0] == subaddr
assert mm[1] == (0x20 | (idx&0xF))
dat += mm[2:]
idx += 1
elif msg[1]&0xf0 == 0x00:
# single
tlen = msg[1] & 0xf
dat = msg[2:]
else:
print(binascii.hexlify(msg))
assert False
return dat[0:tlen]
# **** import below this line ****
def isotp_send(panda, x, addr, bus=0, recvaddr=None, subaddr=None):
if recvaddr is None:
recvaddr = addr+8
if len(x) <= 7 and subaddr is None:
panda.can_send(addr, msg(x), bus)
elif len(x) <= 6 and subaddr is not None:
panda.can_send(addr, bytes([subaddr]) + msg(x)[0:7], bus)
else:
if subaddr:
ss = bytes([subaddr, 0x10 + (len(x) >> 8), len(x) & 0xFF]) + x[0:5]
x = x[5:]
else:
ss = bytes([0x10 + (len(x) >> 8), len(x) & 0xFF]) + x[0:6]
x = x[6:]
idx = 1
sends = []
while len(x) > 0:
if subaddr:
sends.append(((bytes([subaddr, 0x20 + (idx & 0xF)]) + x[0:6]).ljust(8, b"\x00")))
x = x[6:]
else:
sends.append(((bytes([0x20 + (idx & 0xF)]) + x[0:7]).ljust(8, b"\x00")))
x = x[7:]
idx += 1
# actually send
panda.can_send(addr, ss, bus)
rr = recv(panda, 1, recvaddr, bus)[0]
if rr.find(b"\x30\x01") != -1:
for s in sends[:-1]:
panda.can_send(addr, s, 0)
rr = recv(panda, 1, recvaddr, bus)[0]
panda.can_send(addr, sends[-1], 0)
else:
panda.can_send_many([(addr, None, s, 0) for s in sends])
def isotp_recv(panda, addr, bus=0, sendaddr=None, subaddr=None):
if sendaddr is None:
sendaddr = addr-8
if subaddr is not None:
dat = isotp_recv_subaddr(panda, addr, bus, sendaddr, subaddr)
else:
msg = recv(panda, 1, addr, bus)[0]
if msg[0] & 0xf0 == 0x10:
# first
tlen = ((msg[0] & 0xf) << 8) | msg[1]
dat = msg[2:]
# 0 block size?
CONTINUE = b"\x30" + b"\x00"*7
panda.can_send(sendaddr, CONTINUE, bus)
idx = 1
for mm in recv(panda, (tlen-len(dat) + 6)//7, addr, bus):
assert mm[0] == (0x20 | (idx&0xF))
dat += mm[1:]
idx += 1
elif msg[0] & 0xf0 == 0x00:
# single
tlen = msg[0] & 0xf
dat = msg[1:]
else:
assert False
dat = dat[0:tlen]
if DEBUG:
print("R:", binascii.hexlify(dat))
return dat

View File

@@ -1,27 +0,0 @@
# mimic a python serial port
class PandaSerial(object):
def __init__(self, panda, port, baud):
self.panda = panda
self.port = port
self.panda.set_uart_parity(self.port, 0)
self.panda.set_uart_baud(self.port, baud)
self.buf = b""
def read(self, l=1):
tt = self.panda.serial_read(self.port)
if len(tt) > 0:
#print "R: ", tt.encode("hex")
self.buf += tt
ret = self.buf[0:l]
self.buf = self.buf[l:]
return ret
def write(self, dat):
#print "W: ", dat.encode("hex")
#print ' pigeon_send("' + ''.join(map(lambda x: "\\x%02X" % ord(x), dat)) + '");'
return self.panda.serial_write(self.port, dat)
def close(self):
pass

View File

@@ -1,798 +0,0 @@
#!/usr/bin/env python3
import time
import struct
from typing import Callable, NamedTuple, Tuple, List
from enum import IntEnum
class SERVICE_TYPE(IntEnum):
DIAGNOSTIC_SESSION_CONTROL = 0x10
ECU_RESET = 0x11
SECURITY_ACCESS = 0x27
COMMUNICATION_CONTROL = 0x28
TESTER_PRESENT = 0x3E
ACCESS_TIMING_PARAMETER = 0x83
SECURED_DATA_TRANSMISSION = 0x84
CONTROL_DTC_SETTING = 0x85
RESPONSE_ON_EVENT = 0x86
LINK_CONTROL = 0x87
READ_DATA_BY_IDENTIFIER = 0x22
READ_MEMORY_BY_ADDRESS = 0x23
READ_SCALING_DATA_BY_IDENTIFIER = 0x24
READ_DATA_BY_PERIODIC_IDENTIFIER = 0x2A
DYNAMICALLY_DEFINE_DATA_IDENTIFIER = 0x2C
WRITE_DATA_BY_IDENTIFIER = 0x2E
WRITE_MEMORY_BY_ADDRESS = 0x3D
CLEAR_DIAGNOSTIC_INFORMATION = 0x14
READ_DTC_INFORMATION = 0x19
INPUT_OUTPUT_CONTROL_BY_IDENTIFIER = 0x2F
ROUTINE_CONTROL = 0x31
REQUEST_DOWNLOAD = 0x34
REQUEST_UPLOAD = 0x35
TRANSFER_DATA = 0x36
REQUEST_TRANSFER_EXIT = 0x37
class SESSION_TYPE(IntEnum):
DEFAULT = 1
PROGRAMMING = 2
EXTENDED_DIAGNOSTIC = 3
SAFETY_SYSTEM_DIAGNOSTIC = 4
class RESET_TYPE(IntEnum):
HARD = 1
KEY_OFF_ON = 2
SOFT = 3
ENABLE_RAPID_POWER_SHUTDOWN = 4
DISABLE_RAPID_POWER_SHUTDOWN = 5
class ACCESS_TYPE(IntEnum):
REQUEST_SEED = 1
SEND_KEY = 2
class CONTROL_TYPE(IntEnum):
ENABLE_RX_ENABLE_TX = 0
ENABLE_RX_DISABLE_TX = 1
DISABLE_RX_ENABLE_TX = 2
DISABLE_RX_DISABLE_TX = 3
class MESSAGE_TYPE(IntEnum):
NORMAL = 1
NETWORK_MANAGEMENT = 2
NORMAL_AND_NETWORK_MANAGEMENT = 3
class TIMING_PARAMETER_TYPE(IntEnum):
READ_EXTENDED_SET = 1
SET_TO_DEFAULT_VALUES = 2
READ_CURRENTLY_ACTIVE = 3
SET_TO_GIVEN_VALUES = 4
class DTC_SETTING_TYPE(IntEnum):
ON = 1
OFF = 2
class RESPONSE_EVENT_TYPE(IntEnum):
STOP_RESPONSE_ON_EVENT = 0
ON_DTC_STATUS_CHANGE = 1
ON_TIMER_INTERRUPT = 2
ON_CHANGE_OF_DATA_IDENTIFIER = 3
REPORT_ACTIVATED_EVENTS = 4
START_RESPONSE_ON_EVENT = 5
CLEAR_RESPONSE_ON_EVENT = 6
ON_COMPARISON_OF_VALUES = 7
class LINK_CONTROL_TYPE(IntEnum):
VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE = 1
VERIFY_BAUDRATE_TRANSITION_WITH_SPECIFIC_BAUDRATE = 2
TRANSITION_BAUDRATE = 3
class BAUD_RATE_TYPE(IntEnum):
PC9600 = 1
PC19200 = 2
PC38400 = 3
PC57600 = 4
PC115200 = 5
CAN125000 = 16
CAN250000 = 17
CAN500000 = 18
CAN1000000 = 19
class DATA_IDENTIFIER_TYPE(IntEnum):
BOOT_SOFTWARE_IDENTIFICATION = 0xF180
APPLICATION_SOFTWARE_IDENTIFICATION = 0xF181
APPLICATION_DATA_IDENTIFICATION = 0xF182
BOOT_SOFTWARE_FINGERPRINT = 0xF183
APPLICATION_SOFTWARE_FINGERPRINT = 0xF184
APPLICATION_DATA_FINGERPRINT = 0xF185
ACTIVE_DIAGNOSTIC_SESSION = 0xF186
VEHICLE_MANUFACTURER_SPARE_PART_NUMBER = 0xF187
VEHICLE_MANUFACTURER_ECU_SOFTWARE_NUMBER = 0xF188
VEHICLE_MANUFACTURER_ECU_SOFTWARE_VERSION_NUMBER = 0xF189
SYSTEM_SUPPLIER_IDENTIFIER = 0xF18A
ECU_MANUFACTURING_DATE = 0xF18B
ECU_SERIAL_NUMBER = 0xF18C
SUPPORTED_FUNCTIONAL_UNITS = 0xF18D
VEHICLE_MANUFACTURER_KIT_ASSEMBLY_PART_NUMBER = 0xF18E
VIN = 0xF190
VEHICLE_MANUFACTURER_ECU_HARDWARE_NUMBER = 0xF191
SYSTEM_SUPPLIER_ECU_HARDWARE_NUMBER = 0xF192
SYSTEM_SUPPLIER_ECU_HARDWARE_VERSION_NUMBER = 0xF193
SYSTEM_SUPPLIER_ECU_SOFTWARE_NUMBER = 0xF194
SYSTEM_SUPPLIER_ECU_SOFTWARE_VERSION_NUMBER = 0xF195
EXHAUST_REGULATION_OR_TYPE_APPROVAL_NUMBER = 0xF196
SYSTEM_NAME_OR_ENGINE_TYPE = 0xF197
REPAIR_SHOP_CODE_OR_TESTER_SERIAL_NUMBER = 0xF198
PROGRAMMING_DATE = 0xF199
CALIBRATION_REPAIR_SHOP_CODE_OR_CALIBRATION_EQUIPMENT_SERIAL_NUMBER = 0xF19A
CALIBRATION_DATE = 0xF19B
CALIBRATION_EQUIPMENT_SOFTWARE_NUMBER = 0xF19C
ECU_INSTALLATION_DATE = 0xF19D
ODX_FILE = 0xF19E
ENTITY = 0xF19F
class TRANSMISSION_MODE_TYPE(IntEnum):
SEND_AT_SLOW_RATE = 1
SEND_AT_MEDIUM_RATE = 2
SEND_AT_FAST_RATE = 3
STOP_SENDING = 4
class DYNAMIC_DEFINITION_TYPE(IntEnum):
DEFINE_BY_IDENTIFIER = 1
DEFINE_BY_MEMORY_ADDRESS = 2
CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER = 3
class DynamicSourceDefinition(NamedTuple):
data_identifier: int
position: int
memory_size: int
memory_address: int
class DTC_GROUP_TYPE(IntEnum):
EMISSIONS = 0x000000
ALL = 0xFFFFFF
class DTC_REPORT_TYPE(IntEnum):
NUMBER_OF_DTC_BY_STATUS_MASK = 0x01
DTC_BY_STATUS_MASK = 0x02
DTC_SNAPSHOT_IDENTIFICATION = 0x03
DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER = 0x04
DTC_SNAPSHOT_RECORD_BY_RECORD_NUMBER = 0x05
DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER = 0x06
NUMBER_OF_DTC_BY_SEVERITY_MASK_RECORD = 0x07
DTC_BY_SEVERITY_MASK_RECORD = 0x08
SEVERITY_INFORMATION_OF_DTC = 0x09
SUPPORTED_DTC = 0x0A
FIRST_TEST_FAILED_DTC = 0x0B
FIRST_CONFIRMED_DTC = 0x0C
MOST_RECENT_TEST_FAILED_DTC = 0x0D
MOST_RECENT_CONFIRMED_DTC = 0x0E
MIRROR_MEMORY_DTC_BY_STATUS_MASK = 0x0F
MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER = 0x10
NUMBER_OF_MIRROR_MEMORY_DTC_BY_STATUS_MASK = 0x11
NUMBER_OF_EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK = 0x12
EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK = 0x13
DTC_FAULT_DETECTION_COUNTER = 0x14
DTC_WITH_PERMANENT_STATUS = 0x15
class DTC_STATUS_MASK_TYPE(IntEnum):
TEST_FAILED = 0x01
TEST_FAILED_THIS_OPERATION_CYCLE = 0x02
PENDING_DTC = 0x04
CONFIRMED_DTC = 0x08
TEST_NOT_COMPLETED_SINCE_LAST_CLEAR = 0x10
TEST_FAILED_SINCE_LAST_CLEAR = 0x20
TEST_NOT_COMPLETED_THIS_OPERATION_CYCLE = 0x40
WARNING_INDICATOR_REQUESTED = 0x80
ALL = 0xFF
class DTC_SEVERITY_MASK_TYPE(IntEnum):
MAINTENANCE_ONLY = 0x20
CHECK_AT_NEXT_HALT = 0x40
CHECK_IMMEDIATELY = 0x80
ALL = 0xE0
class CONTROL_PARAMETER_TYPE(IntEnum):
RETURN_CONTROL_TO_ECU = 0
RESET_TO_DEFAULT = 1
FREEZE_CURRENT_STATE = 2
SHORT_TERM_ADJUSTMENT = 3
class ROUTINE_CONTROL_TYPE(IntEnum):
START = 1
STOP = 2
REQUEST_RESULTS = 3
class ROUTINE_IDENTIFIER_TYPE(IntEnum):
ERASE_MEMORY = 0xFF00
CHECK_PROGRAMMING_DEPENDENCIES = 0xFF01
ERASE_MIRROR_MEMORY_DTCS = 0xFF02
class MessageTimeoutError(Exception):
pass
class NegativeResponseError(Exception):
def __init__(self, message, service_id, error_code):
super().__init__()
self.message = message
self.service_id = service_id
self.error_code = error_code
def __str__(self):
return self.message
class InvalidServiceIdError(Exception):
pass
class InvalidSubFunctioneError(Exception):
pass
_negative_response_codes = {
0x00: 'positive response',
0x10: 'general reject',
0x11: 'service not supported',
0x12: 'sub-function not supported',
0x13: 'incorrect message length or invalid format',
0x14: 'response too long',
0x21: 'busy repeat request',
0x22: 'conditions not correct',
0x24: 'request sequence error',
0x25: 'no response from subnet component',
0x26: 'failure prevents execution of requested action',
0x31: 'request out of range',
0x33: 'security access denied',
0x35: 'invalid key',
0x36: 'exceed numebr of attempts',
0x37: 'required time delay not expired',
0x70: 'upload download not accepted',
0x71: 'transfer data suspended',
0x72: 'general programming failure',
0x73: 'wrong block sequence counter',
0x78: 'request correctly received - response pending',
0x7e: 'sub-function not supported in active session',
0x7f: 'service not supported in active session',
0x81: 'rpm too high',
0x82: 'rpm too low',
0x83: 'engine is running',
0x84: 'engine is not running',
0x85: 'engine run time too low',
0x86: 'temperature too high',
0x87: 'temperature too low',
0x88: 'vehicle speed too high',
0x89: 'vehicle speed too low',
0x8a: 'throttle/pedal too high',
0x8b: 'throttle/pedal too low',
0x8c: 'transmission not in neutral',
0x8d: 'transmission not in gear',
0x8f: 'brake switch(es) not closed',
0x90: 'shifter lever not in park',
0x91: 'torque converter clutch locked',
0x92: 'voltage too high',
0x93: 'voltage too low',
}
class CanClient():
def __init__(self, can_send: Callable[[Tuple[int, bytes, int]], None], can_recv: Callable[[], List[Tuple[int, int, bytes, int]]], tx_addr: int, rx_addr: int, bus: int, debug: bool=False):
self.tx = can_send
self.rx = can_recv
self.tx_addr = tx_addr
self.rx_addr = rx_addr
self.bus = bus
self.debug = debug
def _recv_filter(self, bus, addr):
# handle functionl addresses (switch to first addr to respond)
if self.tx_addr == 0x7DF:
is_response = addr >= 0x7E8 and addr <= 0x7EF
if is_response:
if self.debug: print(f"switch to physical addr {hex(addr)}")
self.tx_addr = addr-8
self.rx_addr = addr
return is_response
if self.tx_addr == 0x18DB33F1:
is_response = addr >= 0x18DAF100 and addr <= 0x18DAF1FF
if is_response:
if self.debug: print(f"switch to physical addr {hex(addr)}")
self.tx_addr = 0x18DA00F1 + (addr<<8 & 0xFF00)
self.rx_addr = addr
return bus == self.bus and addr == self.rx_addr
def recv(self, drain=False) -> List[bytes]:
msg_array = []
while True:
msgs = self.rx()
if drain:
if self.debug: print("CAN-RX: drain - {}".format(len(msgs)))
else:
for rx_addr, rx_ts, rx_data, rx_bus in msgs or []:
if self._recv_filter(rx_bus, rx_addr) and len(rx_data) > 0:
rx_data = bytes(rx_data) # convert bytearray to bytes
if self.debug: print(f"CAN-RX: {hex(rx_addr)} - 0x{bytes.hex(rx_data)}")
msg_array.append(rx_data)
# break when non-full buffer is processed
if len(msgs) < 254:
return msg_array
def send(self, msgs: List[bytes], delay: float=0) -> None:
first = True
for msg in msgs:
if delay and not first:
if self.debug: print(f"CAN-TX: delay - {delay}")
time.sleep(delay)
if self.debug: print(f"CAN-TX: {hex(self.tx_addr)} - 0x{bytes.hex(msg)}")
self.tx(self.tx_addr, msg, self.bus)
first = False
class IsoTpMessage():
def __init__(self, can_client: CanClient, timeout: float=1, debug: bool=False):
self._can_client = can_client
self.timeout = timeout
self.debug = debug
def send(self, dat: bytes) -> None:
# throw away any stale data
self._can_client.recv(drain=True)
self.tx_dat = dat
self.tx_len = len(dat)
self.tx_idx = 0
self.tx_done = False
self.rx_dat = b""
self.rx_len = 0
self.rx_idx = 0
self.rx_done = False
if self.debug: print(f"ISO-TP: REQUEST - 0x{bytes.hex(self.tx_dat)}")
self._tx_first_frame()
def _tx_first_frame(self) -> None:
if self.tx_len < 8:
# single frame (send all bytes)
if self.debug: print("ISO-TP: TX - single frame")
msg = (bytes([self.tx_len]) + self.tx_dat).ljust(8, b"\x00")
self.tx_done = True
else:
# first frame (send first 6 bytes)
if self.debug: print("ISO-TP: TX - first frame")
msg = (struct.pack("!H", 0x1000 | self.tx_len) + self.tx_dat[:6]).ljust(8, b"\x00")
self._can_client.send([msg])
def recv(self) -> bytes:
start_time = time.time()
try:
while True:
for msg in self._can_client.recv():
self._isotp_rx_next(msg)
if self.tx_done and self.rx_done:
return self.rx_dat
# no timeout indicates non-blocking
if self.timeout == 0:
return None
if time.time() - start_time > self.timeout:
raise MessageTimeoutError("timeout waiting for response")
finally:
if self.debug and self.rx_dat: print(f"ISO-TP: RESPONSE - 0x{bytes.hex(self.rx_dat)}")
def _isotp_rx_next(self, rx_data: bytes) -> None:
# single rx_frame
if rx_data[0] >> 4 == 0x0:
self.rx_len = rx_data[0] & 0xFF
self.rx_dat = rx_data[1:1+self.rx_len]
self.rx_idx = 0
self.rx_done = True
if self.debug: print(f"ISO-TP: RX - single frame - idx={self.rx_idx} done={self.rx_done}")
return
# first rx_frame
if rx_data[0] >> 4 == 0x1:
self.rx_len = ((rx_data[0] & 0x0F) << 8) + rx_data[1]
self.rx_dat = rx_data[2:]
self.rx_idx = 0
self.rx_done = False
if self.debug: print(f"ISO-TP: RX - first frame - idx={self.rx_idx} done={self.rx_done}")
if self.debug: print(f"ISO-TP: TX - flow control continue")
# send flow control message (send all bytes)
msg = b"\x30\x00\x00".ljust(8, b"\x00")
self._can_client.send([msg])
return
# consecutive rx frame
if rx_data[0] >> 4 == 0x2:
assert self.rx_done == False, "isotp - rx: consecutive frame with no active frame"
self.rx_idx += 1
assert self.rx_idx & 0xF == rx_data[0] & 0xF, "isotp - rx: invalid consecutive frame index"
rx_size = self.rx_len - len(self.rx_dat)
self.rx_dat += rx_data[1:1+min(rx_size, 7)]
if self.rx_len == len(self.rx_dat):
self.rx_done = True
if self.debug: print(f"ISO-TP: RX - consecutive frame - idx={self.rx_idx} done={self.rx_done}")
return
# flow control
if rx_data[0] >> 4 == 0x3:
assert self.tx_done == False, "isotp - rx: flow control with no active frame"
assert rx_data[0] != 0x32, "isotp - rx: flow-control overflow/abort"
assert rx_data[0] == 0x30 or rx_data[0] == 0x31, "isotp - rx: flow-control transfer state indicator invalid"
if rx_data[0] == 0x30:
if self.debug: print("ISO-TP: RX - flow control continue")
delay_ts = rx_data[2] & 0x7F
# scale is 1 milliseconds if first bit == 0, 100 micro seconds if first bit == 1
delay_div = 1000. if rx_data[2] & 0x80 == 0 else 10000.
delay_sec = delay_ts / delay_div
# first frame = 6 bytes, each consecutive frame = 7 bytes
start = 6 + self.tx_idx * 7
count = rx_data[1]
end = start + count * 7 if count > 0 else self.tx_len
tx_msgs = []
for i in range(start, end, 7):
self.tx_idx += 1
# consecutive tx messages
msg = (bytes([0x20 | (self.tx_idx & 0xF)]) + self.tx_dat[i:i+7]).ljust(8, b"\x00")
tx_msgs.append(msg)
# send consecutive tx messages
self._can_client.send(tx_msgs, delay=delay_sec)
if end >= self.tx_len:
self.tx_done = True
if self.debug: print(f"ISO-TP: TX - consecutive frame - idx={self.tx_idx} done={self.tx_done}")
elif rx_data[0] == 0x31:
# wait (do nothing until next flow control message)
if self.debug: print("ISO-TP: TX - flow control wait")
FUNCTIONAL_ADDRS = [0x7DF, 0x18DB33F1]
def get_rx_addr_for_tx_addr(tx_addr):
if tx_addr in FUNCTIONAL_ADDRS:
return None
if tx_addr < 0xFFF8:
# standard 11 bit response addr (add 8)
return tx_addr + 8
if tx_addr > 0x10000000 and tx_addr < 0xFFFFFFFF:
# standard 29 bit response addr (flip last two bytes)
return (tx_addr & 0xFFFF0000) + (tx_addr<<8 & 0xFF00) + (tx_addr>>8 & 0xFF)
raise ValueError("invalid tx_addr: {}".format(tx_addr))
class UdsClient():
def __init__(self, panda, tx_addr: int, rx_addr: int=None, bus: int=0, timeout: float=1, debug: bool=False):
self.bus = bus
self.tx_addr = tx_addr
self.rx_addr = rx_addr if rx_addr is not None else get_rx_addr_for_tx_addr(tx_addr)
self.timeout = timeout
self.debug = debug
self._can_client = CanClient(panda.can_send, panda.can_recv, self.tx_addr, self.rx_addr, self.bus, debug=self.debug)
# generic uds request
def _uds_request(self, service_type: SERVICE_TYPE, subfunction: int=None, data: bytes=None) -> bytes:
req = bytes([service_type])
if subfunction is not None:
req += bytes([subfunction])
if data is not None:
req += data
# send request, wait for response
isotp_msg = IsoTpMessage(self._can_client, self.timeout, self.debug)
isotp_msg.send(req)
while True:
resp = isotp_msg.recv()
resp_sid = resp[0] if len(resp) > 0 else None
# negative response
if resp_sid == 0x7F:
service_id = resp[1] if len(resp) > 1 else -1
try:
service_desc = SERVICE_TYPE(service_id).name
except BaseException:
service_desc = 'NON_STANDARD_SERVICE'
error_code = resp[2] if len(resp) > 2 else -1
try:
error_desc = _negative_response_codes[error_code]
except BaseException:
error_desc = resp[3:]
# wait for another message if response pending
if error_code == 0x78:
if self.debug: print("UDS-RX: response pending")
continue
raise NegativeResponseError('{} - {}'.format(service_desc, error_desc), service_id, error_code)
# positive response
if service_type+0x40 != resp_sid:
resp_sid_hex = hex(resp_sid) if resp_sid is not None else None
raise InvalidServiceIdError('invalid response service id: {}'.format(resp_sid_hex))
if subfunction is not None:
resp_sfn = resp[1] if len(resp) > 1 else None
if subfunction != resp_sfn:
resp_sfn_hex = hex(resp_sfn) if resp_sfn is not None else None
raise InvalidSubFunctioneError('invalid response subfunction: {}'.format(hex(resp_sfn_hex)))
# return data (exclude service id and sub-function id)
return resp[(1 if subfunction is None else 2):]
# services
def diagnostic_session_control(self, session_type: SESSION_TYPE):
self._uds_request(SERVICE_TYPE.DIAGNOSTIC_SESSION_CONTROL, subfunction=session_type)
def ecu_reset(self, reset_type: RESET_TYPE):
resp = self._uds_request(SERVICE_TYPE.ECU_RESET, subfunction=reset_type)
power_down_time = None
if reset_type == RESET_TYPE.ENABLE_RAPID_POWER_SHUTDOWN:
power_down_time = resp[0]
return power_down_time
def security_access(self, access_type: ACCESS_TYPE, security_key: bytes=None):
request_seed = access_type % 2 != 0
if request_seed and security_key is not None:
raise ValueError('security_key not allowed')
if not request_seed and security_key is None:
raise ValueError('security_key is missing')
resp = self._uds_request(SERVICE_TYPE.SECURITY_ACCESS, subfunction=access_type, data=security_key)
if request_seed:
security_seed = resp
return security_seed
def communication_control(self, control_type: CONTROL_TYPE, message_type: MESSAGE_TYPE):
data = bytes([message_type])
self._uds_request(SERVICE_TYPE.COMMUNICATION_CONTROL, subfunction=control_type, data=data)
def tester_present(self, ):
self._uds_request(SERVICE_TYPE.TESTER_PRESENT, subfunction=0x00)
def access_timing_parameter(self, timing_parameter_type: TIMING_PARAMETER_TYPE, parameter_values: bytes=None):
write_custom_values = timing_parameter_type == TIMING_PARAMETER_TYPE.SET_TO_GIVEN_VALUES
read_values = (
timing_parameter_type == TIMING_PARAMETER_TYPE.READ_CURRENTLY_ACTIVE or
timing_parameter_type == TIMING_PARAMETER_TYPE.READ_EXTENDED_SET
)
if not write_custom_values and parameter_values is not None:
raise ValueError('parameter_values not allowed')
if write_custom_values and parameter_values is None:
raise ValueError('parameter_values is missing')
resp = self._uds_request(SERVICE_TYPE.ACCESS_TIMING_PARAMETER, subfunction=timing_parameter_type, data=parameter_values)
if read_values:
# TODO: parse response into values?
parameter_values = resp
return parameter_values
def secured_data_transmission(self, data: bytes):
# TODO: split data into multiple input parameters?
resp = self._uds_request(SERVICE_TYPE.SECURED_DATA_TRANSMISSION, subfunction=None, data=data)
# TODO: parse response into multiple output values?
return resp
def control_dtc_setting(self, dtc_setting_type: DTC_SETTING_TYPE):
self._uds_request(SERVICE_TYPE.CONTROL_DTC_SETTING, subfunction=dtc_setting_type)
def response_on_event(self, response_event_type: RESPONSE_EVENT_TYPE, store_event: bool, window_time: int, event_type_record: int, service_response_record: int):
if store_event:
response_event_type |= 0x20
# TODO: split record parameters into arrays
data = bytes([window_time, event_type_record, service_response_record])
resp = self._uds_request(SERVICE_TYPE.RESPONSE_ON_EVENT, subfunction=response_event_type, data=data)
if response_event_type == RESPONSE_EVENT_TYPE.REPORT_ACTIVATED_EVENTS:
return {
"num_of_activated_events": resp[0],
"data": resp[1:], # TODO: parse the reset of response
}
return {
"num_of_identified_events": resp[0],
"event_window_time": resp[1],
"data": resp[2:], # TODO: parse the reset of response
}
def link_control(self, link_control_type: LINK_CONTROL_TYPE, baud_rate_type: BAUD_RATE_TYPE=None):
if link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE:
# baud_rate_type = BAUD_RATE_TYPE
data = bytes([baud_rate_type])
elif link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_SPECIFIC_BAUDRATE:
# baud_rate_type = custom value (3 bytes big-endian)
data = struct.pack('!I', baud_rate_type)[1:]
else:
data = None
self._uds_request(SERVICE_TYPE.LINK_CONTROL, subfunction=link_control_type, data=data)
def read_data_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE):
# TODO: support list of identifiers
data = struct.pack('!H', data_identifier_type)
resp = self._uds_request(SERVICE_TYPE.READ_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
return resp[2:]
def read_memory_by_address(self, memory_address: int, memory_size: int, memory_address_bytes: int=4, memory_size_bytes: int=1):
if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data = bytes([memory_size_bytes<<4 | memory_address_bytes])
if memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
data += struct.pack('!I', memory_address)[4-memory_address_bytes:]
if memory_size >= 1<<(memory_size_bytes*8):
raise ValueError('invalid memory_size: {}'.format(memory_size))
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]
resp = self._uds_request(SERVICE_TYPE.READ_MEMORY_BY_ADDRESS, subfunction=None, data=data)
return resp
def read_scaling_data_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE):
data = struct.pack('!H', data_identifier_type)
resp = self._uds_request(SERVICE_TYPE.READ_SCALING_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
return resp[2:] # TODO: parse the response
def read_data_by_periodic_identifier(self, transmission_mode_type: TRANSMISSION_MODE_TYPE, periodic_data_identifier: int):
# TODO: support list of identifiers
data = bytes([transmission_mode_type, periodic_data_identifier])
self._uds_request(SERVICE_TYPE.READ_DATA_BY_PERIODIC_IDENTIFIER, subfunction=None, data=data)
def dynamically_define_data_identifier(self, dynamic_definition_type: DYNAMIC_DEFINITION_TYPE, dynamic_data_identifier: int, source_definitions: List[DynamicSourceDefinition], memory_address_bytes: int=4, memory_size_bytes: int=1):
if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data = struct.pack('!H', dynamic_data_identifier)
if dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_IDENTIFIER:
for s in source_definitions:
data += struct.pack('!H', s["data_identifier"]) + bytes([s["position"], s["memory_size"]])
elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_MEMORY_ADDRESS:
data += bytes([memory_size_bytes<<4 | memory_address_bytes])
for s in source_definitions:
if s["memory_address"] >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(s["memory_address"]))
data += struct.pack('!I', s["memory_address"])[4-memory_address_bytes:]
if s["memory_size"] >= 1<<(memory_size_bytes*8):
raise ValueError('invalid memory_size: {}'.format(s["memory_size"]))
data += struct.pack('!I', s["memory_size"])[4-memory_size_bytes:]
elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER:
pass
else:
raise ValueError('invalid dynamic identifier type: {}'.format(hex(dynamic_definition_type)))
self._uds_request(SERVICE_TYPE.DYNAMICALLY_DEFINE_DATA_IDENTIFIER, subfunction=dynamic_definition_type, data=data)
def write_data_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE, data_record: bytes):
data = struct.pack('!H', data_identifier_type) + data_record
resp = self._uds_request(SERVICE_TYPE.WRITE_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
def write_memory_by_address(self, memory_address: int, memory_size: int, data_record: bytes, memory_address_bytes: int=4, memory_size_bytes: int=1):
if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data = bytes([memory_size_bytes<<4 | memory_address_bytes])
if memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
data += struct.pack('!I', memory_address)[4-memory_address_bytes:]
if memory_size >= 1<<(memory_size_bytes*8):
raise ValueError('invalid memory_size: {}'.format(memory_size))
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]
data += data_record
self._uds_request(SERVICE_TYPE.WRITE_MEMORY_BY_ADDRESS, subfunction=0x00, data=data)
def clear_diagnostic_information(self, dtc_group_type: DTC_GROUP_TYPE):
data = struct.pack('!I', dtc_group_type)[1:] # 3 bytes
self._uds_request(SERVICE_TYPE.CLEAR_DIAGNOSTIC_INFORMATION, subfunction=None, data=data)
def read_dtc_information(self, dtc_report_type: DTC_REPORT_TYPE, dtc_status_mask_type: DTC_STATUS_MASK_TYPE=DTC_STATUS_MASK_TYPE.ALL, dtc_severity_mask_type: DTC_SEVERITY_MASK_TYPE=DTC_SEVERITY_MASK_TYPE.ALL, dtc_mask_record: int=0xFFFFFF, dtc_snapshot_record_num: int=0xFF, dtc_extended_record_num: int=0xFF):
data = b''
# dtc_status_mask_type
if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_STATUS_MASK or \
dtc_report_type == DTC_REPORT_TYPE.DTC_BY_STATUS_MASK or \
dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_BY_STATUS_MASK or \
dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_MIRROR_MEMORY_DTC_BY_STATUS_MASK or \
dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK or \
dtc_report_type == DTC_REPORT_TYPE.EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK:
data += bytes([dtc_status_mask_type])
# dtc_mask_record
if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or \
dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.SEVERITY_INFORMATION_OF_DTC:
data += struct.pack('!I', dtc_mask_record)[1:] # 3 bytes
# dtc_snapshot_record_num
if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or \
dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_RECORD_NUMBER:
data += ord(dtc_snapshot_record_num)
# dtc_extended_record_num
if dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER:
data += bytes([dtc_extended_record_num])
# dtc_severity_mask_type
if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_SEVERITY_MASK_RECORD or \
dtc_report_type == DTC_REPORT_TYPE.DTC_BY_SEVERITY_MASK_RECORD:
data += bytes([dtc_severity_mask_type, dtc_status_mask_type])
resp = self._uds_request(SERVICE_TYPE.READ_DTC_INFORMATION, subfunction=dtc_report_type, data=data)
# TODO: parse response
return resp
def input_output_control_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE, control_parameter_type: CONTROL_PARAMETER_TYPE, control_option_record: bytes, control_enable_mask_record: bytes=b''):
data = struct.pack('!H', data_identifier_type) + bytes([control_parameter_type]) + control_option_record + control_enable_mask_record
resp = self._uds_request(SERVICE_TYPE.INPUT_OUTPUT_CONTROL_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
return resp[2:]
def routine_control(self, routine_control_type: ROUTINE_CONTROL_TYPE, routine_identifier_type: ROUTINE_IDENTIFIER_TYPE, routine_option_record: bytes=b''):
data = struct.pack('!H', routine_identifier_type) + routine_option_record
resp = self._uds_request(SERVICE_TYPE.ROUTINE_CONTROL, subfunction=routine_control_type, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != routine_identifier_type:
raise ValueError('invalid response routine identifier: {}'.format(hex(resp_id)))
return resp[2:]
def request_download(self, memory_address: int, memory_size: int, memory_address_bytes: int=4, memory_size_bytes: int=4, data_format: int=0x00):
data = bytes([data_format])
if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data += bytes([memory_size_bytes<<4 | memory_address_bytes])
if memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
data += struct.pack('!I', memory_address)[4-memory_address_bytes:]
if memory_size >= 1<<(memory_size_bytes*8):
raise ValueError('invalid memory_size: {}'.format(memory_size))
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]
resp = self._uds_request(SERVICE_TYPE.REQUEST_DOWNLOAD, subfunction=None, data=data)
max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else None
if max_num_bytes_len >= 1 and max_num_bytes_len <= 4:
max_num_bytes = struct.unpack('!I', (b"\x00"*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0]
else:
raise ValueError('invalid max_num_bytes_len: {}'.format(max_num_bytes_len))
return max_num_bytes # max number of bytes per transfer data request
def request_upload(self, memory_address: int, memory_size: int, memory_address_bytes: int=4, memory_size_bytes: int=4, data_format: int=0x00):
data = bytes([data_format])
if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data += bytes([memory_size_bytes<<4 | memory_address_bytes])
if memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
data += struct.pack('!I', memory_address)[4-memory_address_bytes:]
if memory_size >= 1<<(memory_size_bytes*8):
raise ValueError('invalid memory_size: {}'.format(memory_size))
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]
resp = self._uds_request(SERVICE_TYPE.REQUEST_UPLOAD, subfunction=None, data=data)
max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else None
if max_num_bytes_len >= 1 and max_num_bytes_len <= 4:
max_num_bytes = struct.unpack('!I', (b"\x00"*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0]
else:
raise ValueError('invalid max_num_bytes_len: {}'.format(max_num_bytes_len))
return max_num_bytes # max number of bytes per transfer data request
def transfer_data(self, block_sequence_count: int, data: bytes=b''):
data = bytes([block_sequence_count]) + data
resp = self._uds_request(SERVICE_TYPE.TRANSFER_DATA, subfunction=None, data=data)
resp_id = resp[0] if len(resp) > 0 else None
if resp_id != block_sequence_count:
raise ValueError('invalid block_sequence_count: {}'.format(resp_id))
return resp[1:]
def request_transfer_exit(self):
self._uds_request(SERVICE_TYPE.REQUEST_TRANSFER_EXIT, subfunction=None)

View File

@@ -1,45 +0,0 @@
#!/usr/bin/env python3
import os
import time
def ensure_st_up_to_date():
from panda import Panda, PandaDFU, BASEDIR
with open(os.path.join(BASEDIR, "VERSION")) as f:
repo_version = f.read()
repo_version += "-EON" if os.path.isfile('/EON') else "-DEV"
panda = None
panda_dfu = None
while 1:
# break on normal mode Panda
panda_list = Panda.list()
if len(panda_list) > 0:
panda = Panda(panda_list[0])
break
# flash on DFU mode Panda
panda_dfu = PandaDFU.list()
if len(panda_dfu) > 0:
panda_dfu = PandaDFU(panda_dfu[0])
panda_dfu.recover()
print("waiting for board...")
time.sleep(1)
if panda.bootstub or not panda.get_version().startswith(repo_version):
panda.flash()
if panda.bootstub:
panda.recover()
assert(not panda.bootstub)
version = str(panda.get_version())
print("%s should be %s" % (version, repo_version))
assert(version.startswith(repo_version))
if __name__ == "__main__":
ensure_st_up_to_date()