mirror of
https://github.com/dragonpilot/dragonpilot.git
synced 2026-02-19 18:13:57 +08:00
2253dd3 fix volt ign detect 3b299d7 add ignition and refactor af9af6d Merge pull request #110 from Jamezz/volt 13e850e more correct f295063 add new define to tests fec9758 gate that with debug 5516ebf one more ifdef cac7b31 only panda has float 938d474 fpu enable ffbf0c7 cleaner de30f27 Revert "need f to not be double" 4142acf need f to not be double 3eb15c8 refactor to share code a4c8b64 change to O2 to fix make recover 711fd11 Enable compiler optimizations, fix things it breaks 2e6f774 block IPAS in main toyota safety mode e7a2b3a add ipas tests 894572c fix tests 367c9ad add safety toyota ipas 95919b9 Bounty: panda high quality CAN autobaud (#96) 6557cd2 Toyota Safety: allow controls only on rising edge of cruise_engaged 02c1ddf Revert "added steer override check when IPAS is in control (#106)" 9f925ba Fix the merge mess 23d3833 Merge from comma upstream a0cc51a Undo safety mode override ea1c1dc make wlan interface name generic 6dbd8c9 Implement WebUSB and upgrade WinUSB to 2.0 (#107) 4fc83a5 Add safety hook for ignition and have GM use gear selector to determine ignition 52b2ac0 switch from travis to circleci 48e2374 build panda esp image 065572a circleci build stm image 7a1f319 add panda python package test and fix safety test 021dde7 move saftey test helper files into safety folder ce0545f add ci files 6a3307c no LIN over ELM 7d21acb added steer override check when IPAS is in control (#106) 1c88caf Safety code testing (#104) f4efd1f Merge pull request #101 from adhintz/master c02618b Merge pull request #102 from quillford/master 1ba5f8a added link to wiki for user scripts de2b19e add support for multiple buses to can_unique and can_bittransition output data in sorted order. git-subtree-dir: panda git-subtree-split: 2253dd3c48e21abb82fe161d6f58237490111206
538 lines
16 KiB
Python
538 lines
16 KiB
Python
# python library to interface with panda
|
|
from __future__ import print_function
|
|
import binascii
|
|
import struct
|
|
import hashlib
|
|
import socket
|
|
import usb1
|
|
import os
|
|
import time
|
|
import traceback
|
|
from dfu import PandaDFU
|
|
from esptool import ESPROM, CesantaFlasher
|
|
from flash_release import flash_release
|
|
from update import ensure_st_up_to_date
|
|
from serial import PandaSerial
|
|
from isotp import isotp_send, isotp_recv
|
|
|
|
__version__ = '0.0.7'
|
|
|
|
BASEDIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "../")
|
|
|
|
DEBUG = os.getenv("PANDADEBUG") is not None
|
|
|
|
# *** wifi mode ***
|
|
|
|
def build_st(target, mkfile="Makefile"):
|
|
from panda import BASEDIR
|
|
assert(os.system('cd %s && make -f %s clean && make -f %s %s >/dev/null' % (os.path.join(BASEDIR, "board"), mkfile, mkfile, target)) == 0)
|
|
|
|
def parse_can_buffer(dat):
|
|
ret = []
|
|
for j in range(0, len(dat), 0x10):
|
|
ddat = dat[j:j+0x10]
|
|
f1, f2 = struct.unpack("II", ddat[0:8])
|
|
extended = 4
|
|
if f1 & extended:
|
|
address = f1 >> 3
|
|
else:
|
|
address = f1 >> 21
|
|
dddat = ddat[8:8+(f2&0xF)]
|
|
if DEBUG:
|
|
print(" R %x: %s" % (address, str(dddat).encode("hex")))
|
|
ret.append((address, f2>>16, dddat, (f2>>4)&0xFF))
|
|
return ret
|
|
|
|
class PandaWifiStreaming(object):
|
|
def __init__(self, ip="192.168.0.10", port=1338):
|
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
self.sock.setblocking(0)
|
|
self.ip = ip
|
|
self.port = port
|
|
self.kick()
|
|
|
|
def kick(self):
|
|
# must be called at least every 5 seconds
|
|
self.sock.sendto("hello", (self.ip, self.port))
|
|
|
|
def can_recv(self):
|
|
ret = []
|
|
while True:
|
|
try:
|
|
dat, addr = self.sock.recvfrom(0x200*0x10)
|
|
if addr == (self.ip, self.port):
|
|
ret += parse_can_buffer(dat)
|
|
except socket.error as e:
|
|
if e.errno != 35 and e.errno != 11:
|
|
traceback.print_exc()
|
|
break
|
|
return ret
|
|
|
|
# stupid tunneling of USB over wifi and SPI
|
|
class WifiHandle(object):
|
|
def __init__(self, ip="192.168.0.10", port=1337):
|
|
self.sock = socket.create_connection((ip, port))
|
|
|
|
def __recv(self):
|
|
ret = self.sock.recv(0x44)
|
|
length = struct.unpack("I", ret[0:4])[0]
|
|
return ret[4:4+length]
|
|
|
|
def controlWrite(self, request_type, request, value, index, data, timeout=0):
|
|
# ignore data in reply, panda doesn't use it
|
|
return self.controlRead(request_type, request, value, index, 0, timeout)
|
|
|
|
def controlRead(self, request_type, request, value, index, length, timeout=0):
|
|
self.sock.send(struct.pack("HHBBHHH", 0, 0, request_type, request, value, index, length))
|
|
return self.__recv()
|
|
|
|
def bulkWrite(self, endpoint, data, timeout=0):
|
|
if len(data) > 0x10:
|
|
raise ValueError("Data must not be longer than 0x10")
|
|
self.sock.send(struct.pack("HH", endpoint, len(data))+data)
|
|
self.__recv() # to /dev/null
|
|
|
|
def bulkRead(self, endpoint, length, timeout=0):
|
|
self.sock.send(struct.pack("HH", endpoint, 0))
|
|
return self.__recv()
|
|
|
|
def close(self):
|
|
self.sock.close()
|
|
|
|
# *** normal mode ***
|
|
|
|
class Panda(object):
|
|
SAFETY_NOOUTPUT = 0
|
|
SAFETY_HONDA = 1
|
|
SAFETY_TOYOTA = 2
|
|
SAFETY_HONDA_BOSCH = 4
|
|
SAFETY_TOYOTA_NOLIMITS = 0x1336
|
|
SAFETY_ALLOUTPUT = 0x1337
|
|
SAFETY_ELM327 = 0xE327
|
|
|
|
SERIAL_DEBUG = 0
|
|
SERIAL_ESP = 1
|
|
SERIAL_LIN1 = 2
|
|
SERIAL_LIN2 = 3
|
|
|
|
GMLAN_CAN2 = 1
|
|
GMLAN_CAN3 = 2
|
|
|
|
REQUEST_IN = usb1.ENDPOINT_IN | usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE
|
|
REQUEST_OUT = usb1.ENDPOINT_OUT | usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE
|
|
|
|
def __init__(self, serial=None, claim=True):
|
|
self._serial = serial
|
|
self._handle = None
|
|
self.connect(claim)
|
|
|
|
def close(self):
|
|
self._handle.close()
|
|
self._handle = None
|
|
|
|
def connect(self, claim=True, wait=False):
|
|
if self._handle != None:
|
|
self.close()
|
|
|
|
if self._serial == "WIFI":
|
|
self._handle = WifiHandle()
|
|
print("opening WIFI device")
|
|
self.wifi = True
|
|
else:
|
|
context = usb1.USBContext()
|
|
self._handle = None
|
|
self.wifi = False
|
|
|
|
while 1:
|
|
try:
|
|
for device in context.getDeviceList(skip_on_error=True):
|
|
#print(device)
|
|
if device.getVendorID() == 0xbbaa and device.getProductID() in [0xddcc, 0xddee]:
|
|
try:
|
|
this_serial = device.getSerialNumber()
|
|
except Exception:
|
|
continue
|
|
if self._serial is None or this_serial == self._serial:
|
|
self._serial = this_serial
|
|
print("opening device", self._serial, hex(device.getProductID()))
|
|
time.sleep(1)
|
|
self.bootstub = device.getProductID() == 0xddee
|
|
self.legacy = (device.getbcdDevice() != 0x2300)
|
|
self._handle = device.open()
|
|
if claim:
|
|
self._handle.claimInterface(0)
|
|
#self._handle.setInterfaceAltSetting(0, 0) #Issue in USB stack
|
|
break
|
|
except Exception as e:
|
|
print("exception", e)
|
|
traceback.print_exc()
|
|
if wait == False or self._handle != None:
|
|
break
|
|
assert(self._handle != None)
|
|
print("connected")
|
|
|
|
def reset(self, enter_bootstub=False, enter_bootloader=False):
|
|
# reset
|
|
try:
|
|
if enter_bootloader:
|
|
self._handle.controlWrite(Panda.REQUEST_IN, 0xd1, 0, 0, b'')
|
|
else:
|
|
if enter_bootstub:
|
|
self._handle.controlWrite(Panda.REQUEST_IN, 0xd1, 1, 0, b'')
|
|
else:
|
|
self._handle.controlWrite(Panda.REQUEST_IN, 0xd8, 0, 0, b'')
|
|
except Exception:
|
|
pass
|
|
if not enter_bootloader:
|
|
self.reconnect()
|
|
|
|
def reconnect(self):
|
|
self.close()
|
|
time.sleep(1.0)
|
|
success = False
|
|
# wait up to 15 seconds
|
|
for i in range(0, 15):
|
|
try:
|
|
self.connect()
|
|
success = True
|
|
break
|
|
except Exception:
|
|
print("reconnecting is taking %d seconds..." % (i+1))
|
|
try:
|
|
dfu = PandaDFU(PandaDFU.st_serial_to_dfu_serial(self._serial))
|
|
dfu.recover()
|
|
except Exception:
|
|
pass
|
|
time.sleep(1.0)
|
|
if not success:
|
|
raise Exception("reconnect failed")
|
|
|
|
@staticmethod
|
|
def flash_static(handle, code):
|
|
# confirm flasher is present
|
|
fr = handle.controlRead(Panda.REQUEST_IN, 0xb0, 0, 0, 0xc)
|
|
assert fr[4:8] == "\xde\xad\xd0\x0d"
|
|
|
|
# unlock flash
|
|
print("flash: unlocking")
|
|
handle.controlWrite(Panda.REQUEST_IN, 0xb1, 0, 0, b'')
|
|
|
|
# erase sectors 1 and 2
|
|
print("flash: erasing")
|
|
handle.controlWrite(Panda.REQUEST_IN, 0xb2, 1, 0, b'')
|
|
handle.controlWrite(Panda.REQUEST_IN, 0xb2, 2, 0, b'')
|
|
|
|
# flash over EP2
|
|
STEP = 0x10
|
|
print("flash: flashing")
|
|
for i in range(0, len(code), STEP):
|
|
handle.bulkWrite(2, code[i:i+STEP])
|
|
|
|
# reset
|
|
print("flash: resetting")
|
|
try:
|
|
handle.controlWrite(Panda.REQUEST_IN, 0xd8, 0, 0, b'')
|
|
except Exception:
|
|
pass
|
|
|
|
def flash(self, fn=None, code=None, reconnect=True):
|
|
if not self.bootstub:
|
|
self.reset(enter_bootstub=True)
|
|
assert(self.bootstub)
|
|
|
|
if fn is None and code is None:
|
|
if self.legacy:
|
|
fn = "obj/comma.bin"
|
|
print("building legacy st code")
|
|
build_st(fn, "Makefile.legacy")
|
|
else:
|
|
fn = "obj/panda.bin"
|
|
print("building panda st code")
|
|
build_st(fn)
|
|
fn = os.path.join(BASEDIR, "board", fn)
|
|
|
|
if code is None:
|
|
with open(fn) as f:
|
|
code = f.read()
|
|
|
|
# get version
|
|
print("flash: version is "+self.get_version())
|
|
|
|
# do flash
|
|
Panda.flash_static(self._handle, code)
|
|
|
|
# reconnect
|
|
if reconnect:
|
|
self.reconnect()
|
|
|
|
def recover(self):
|
|
self.reset(enter_bootloader=True)
|
|
while len(PandaDFU.list()) == 0:
|
|
print("waiting for DFU...")
|
|
time.sleep(0.1)
|
|
|
|
dfu = PandaDFU(PandaDFU.st_serial_to_dfu_serial(self._serial))
|
|
dfu.recover()
|
|
|
|
# reflash after recover
|
|
self.connect(True, True)
|
|
self.flash()
|
|
|
|
@staticmethod
|
|
def flash_ota_st():
|
|
ret = os.system("cd %s && make clean && make ota" % (os.path.join(BASEDIR, "board")))
|
|
time.sleep(1)
|
|
return ret==0
|
|
|
|
@staticmethod
|
|
def flash_ota_wifi():
|
|
ret = os.system("cd %s && make clean && make ota" % (os.path.join(BASEDIR, "boardesp")))
|
|
time.sleep(1)
|
|
return ret==0
|
|
|
|
@staticmethod
|
|
def list():
|
|
context = usb1.USBContext()
|
|
ret = []
|
|
try:
|
|
for device in context.getDeviceList(skip_on_error=True):
|
|
if device.getVendorID() == 0xbbaa and device.getProductID() in [0xddcc, 0xddee]:
|
|
try:
|
|
ret.append(device.getSerialNumber())
|
|
except Exception:
|
|
continue
|
|
except Exception:
|
|
pass
|
|
# TODO: detect if this is real
|
|
#ret += ["WIFI"]
|
|
return ret
|
|
|
|
def call_control_api(self, msg):
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, msg, 0, 0, b'')
|
|
|
|
# ******************* health *******************
|
|
|
|
def health(self):
|
|
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xd2, 0, 0, 13)
|
|
a = struct.unpack("IIBBBBB", dat)
|
|
return {"voltage": a[0], "current": a[1],
|
|
"started": a[2], "controls_allowed": a[3],
|
|
"gas_interceptor_detected": a[4],
|
|
"started_signal_detected": a[5],
|
|
"started_alt": a[6]}
|
|
|
|
# ******************* control *******************
|
|
|
|
def enter_bootloader(self):
|
|
try:
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xd1, 0, 0, b'')
|
|
except Exception as e:
|
|
print(e)
|
|
pass
|
|
|
|
def get_version(self):
|
|
return self._handle.controlRead(Panda.REQUEST_IN, 0xd6, 0, 0, 0x40)
|
|
|
|
def is_grey(self):
|
|
ret = self._handle.controlRead(Panda.REQUEST_IN, 0xc1, 0, 0, 0x40)
|
|
return ret == "\x01"
|
|
|
|
def get_serial(self):
|
|
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xd0, 0, 0, 0x20)
|
|
hashsig, calc_hash = dat[0x1c:], hashlib.sha1(dat[0:0x1c]).digest()[0:4]
|
|
assert(hashsig == calc_hash)
|
|
return [dat[0:0x10], dat[0x10:0x10+10]]
|
|
|
|
def get_secret(self):
|
|
return self._handle.controlRead(Panda.REQUEST_IN, 0xd0, 1, 0, 0x10)
|
|
|
|
# ******************* configuration *******************
|
|
|
|
def set_usb_power(self, on):
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe6, int(on), 0, b'')
|
|
|
|
def set_esp_power(self, on):
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xd9, int(on), 0, b'')
|
|
|
|
def esp_reset(self, bootmode=0):
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xda, int(bootmode), 0, b'')
|
|
time.sleep(0.2)
|
|
|
|
def set_safety_mode(self, mode=SAFETY_NOOUTPUT):
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xdc, mode, 0, b'')
|
|
|
|
def set_can_forwarding(self, from_bus, to_bus):
|
|
# TODO: This feature may not work correctly with saturated buses
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xdd, from_bus, to_bus, b'')
|
|
|
|
def set_gmlan(self, bus=2):
|
|
if bus is None:
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xdb, 0, 0, b'')
|
|
elif bus in [Panda.GMLAN_CAN2, Panda.GMLAN_CAN3]:
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xdb, 1, bus, b'')
|
|
|
|
def set_can_loopback(self, enable):
|
|
# set can loopback mode for all buses
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe5, int(enable), 0, b'')
|
|
|
|
def set_can_speed_kbps(self, bus, speed):
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xde, bus, int(speed*10), b'')
|
|
|
|
def set_uart_baud(self, uart, rate):
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe4, uart, rate/300, b'')
|
|
|
|
def set_uart_parity(self, uart, parity):
|
|
# parity, 0=off, 1=even, 2=odd
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe2, uart, parity, b'')
|
|
|
|
def set_uart_callback(self, uart, install):
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe3, uart, int(install), b'')
|
|
|
|
# ******************* can *******************
|
|
|
|
def can_send_many(self, arr):
|
|
snds = []
|
|
transmit = 1
|
|
extended = 4
|
|
for addr, _, dat, bus in arr:
|
|
assert len(dat) <= 8
|
|
if DEBUG:
|
|
print(" W %x: %s" % (addr, dat.encode("hex")))
|
|
if addr >= 0x800:
|
|
rir = (addr << 3) | transmit | extended
|
|
else:
|
|
rir = (addr << 21) | transmit
|
|
snd = struct.pack("II", rir, len(dat) | (bus << 4)) + dat
|
|
snd = snd.ljust(0x10, b'\x00')
|
|
snds.append(snd)
|
|
|
|
while True:
|
|
try:
|
|
#print("DAT: %s"%b''.join(snds).__repr__())
|
|
if self.wifi:
|
|
for s in snds:
|
|
self._handle.bulkWrite(3, s)
|
|
else:
|
|
self._handle.bulkWrite(3, b''.join(snds))
|
|
break
|
|
except (usb1.USBErrorIO, usb1.USBErrorOverflow):
|
|
print("CAN: BAD SEND MANY, RETRYING")
|
|
|
|
def can_send(self, addr, dat, bus):
|
|
self.can_send_many([[addr, None, dat, bus]])
|
|
|
|
def can_recv(self):
|
|
dat = bytearray()
|
|
while True:
|
|
try:
|
|
dat = self._handle.bulkRead(1, 0x10*256)
|
|
break
|
|
except (usb1.USBErrorIO, usb1.USBErrorOverflow):
|
|
print("CAN: BAD RECV, RETRYING")
|
|
return parse_can_buffer(dat)
|
|
|
|
def can_clear(self, bus):
|
|
"""Clears all messages from the specified internal CAN ringbuffer as
|
|
though it were drained.
|
|
|
|
Args:
|
|
bus (int): can bus number to clear a tx queue, or 0xFFFF to clear the
|
|
global can rx queue.
|
|
|
|
"""
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf1, bus, 0, b'')
|
|
|
|
# ******************* isotp *******************
|
|
|
|
def isotp_send(self, addr, dat, bus, recvaddr=None, subaddr=None):
|
|
return isotp_send(self, dat, addr, bus, recvaddr, subaddr)
|
|
|
|
def isotp_recv(self, addr, bus=0, sendaddr=None, subaddr=None):
|
|
return isotp_recv(self, addr, bus, sendaddr, subaddr)
|
|
|
|
# ******************* serial *******************
|
|
|
|
def serial_read(self, port_number):
|
|
ret = []
|
|
while 1:
|
|
lret = bytes(self._handle.controlRead(Panda.REQUEST_IN, 0xe0, port_number, 0, 0x40))
|
|
if len(lret) == 0:
|
|
break
|
|
ret.append(lret)
|
|
return b''.join(ret)
|
|
|
|
def serial_write(self, port_number, ln):
|
|
ret = 0
|
|
for i in range(0, len(ln), 0x20):
|
|
ret += self._handle.bulkWrite(2, struct.pack("B", port_number) + ln[i:i+0x20])
|
|
return ret
|
|
|
|
def serial_clear(self, port_number):
|
|
"""Clears all messages (tx and rx) from the specified internal uart
|
|
ringbuffer as though it were drained.
|
|
|
|
Args:
|
|
port_number (int): port number of the uart to clear.
|
|
|
|
"""
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf2, port_number, 0, b'')
|
|
|
|
# ******************* kline *******************
|
|
|
|
# pulse low for wakeup
|
|
def kline_wakeup(self):
|
|
if DEBUG:
|
|
print("kline wakeup...")
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf0, 0, 0, b'')
|
|
if DEBUG:
|
|
print("kline wakeup done")
|
|
|
|
def kline_drain(self, bus=2):
|
|
# drain buffer
|
|
bret = bytearray()
|
|
while True:
|
|
ret = self._handle.controlRead(Panda.REQUEST_IN, 0xe0, bus, 0, 0x40)
|
|
if len(ret) == 0:
|
|
break
|
|
elif DEBUG:
|
|
print("kline drain: "+str(ret).encode("hex"))
|
|
bret += ret
|
|
return bytes(bret)
|
|
|
|
def kline_ll_recv(self, cnt, bus=2):
|
|
echo = bytearray()
|
|
while len(echo) != cnt:
|
|
ret = str(self._handle.controlRead(Panda.REQUEST_OUT, 0xe0, bus, 0, cnt-len(echo)))
|
|
if DEBUG and len(ret) > 0:
|
|
print("kline recv: "+ret.encode("hex"))
|
|
echo += ret
|
|
return str(echo)
|
|
|
|
def kline_send(self, x, bus=2, checksum=True):
|
|
def get_checksum(dat):
|
|
result = 0
|
|
result += sum(map(ord, dat)) if isinstance(b'dat', str) else sum(dat)
|
|
result = -result
|
|
return struct.pack("B", result % 0x100)
|
|
|
|
self.kline_drain(bus=bus)
|
|
if checksum:
|
|
x += get_checksum(x)
|
|
for i in range(0, len(x), 0xf):
|
|
ts = x[i:i+0xf]
|
|
if DEBUG:
|
|
print("kline send: "+ts.encode("hex"))
|
|
self._handle.bulkWrite(2, chr(bus).encode()+ts)
|
|
echo = self.kline_ll_recv(len(ts), bus=bus)
|
|
if echo != ts:
|
|
print("**** ECHO ERROR %d ****" % i)
|
|
print(binascii.hexlify(echo))
|
|
print(binascii.hexlify(ts))
|
|
assert echo == ts
|
|
|
|
def kline_recv(self, bus=2):
|
|
msg = self.kline_ll_recv(2, bus=bus)
|
|
msg += self.kline_ll_recv(ord(msg[1])-2, bus=bus)
|
|
return msg
|
|
|