mirror of
https://github.com/sunnypilot/sunnypilot.git
synced 2026-02-20 06:44:04 +08:00
b42db6d Merge pull request #82 from commaai/uart_dma
fd68f86 smallr
be99ffc ok that doesn't hurt i think
a9f6bf0 this
8b7e849 working now
7fa4808 froze up, maybe thats the fix
1465aa4 ok, it's fixed
915cd84 ugh, ok, need that
fd05376 comment out debug
37c5263 big fifo
497f069 dma is all critical, no interrupts
7c34afe minor change
743d244 high baud rate works
5d2a4ba v1.0.6
fbf1390 Toyota Safety: fix in input param
6c01d09 Toyota: less torque error allowance to meet Corolla acceptable behavior
07c01b2 Toyota safety: using input param
4410a59 add safety param support
fc81fc1 uart dma in progress
65fb2b2 grey panda query, 1.0.5
f415c9a grey panda detection
b68957e add pandadebug support
b5e4962 leave msgs around in isotp
0acce2d add recvaddr support
3fc38f4 set bootmode with power
d4c052a make that work
21f8195 fix panda serial write
af74aa9 from python import
git-subtree-dir: panda
git-subtree-split: b42db6dc082fb13ef5ac63ed197a63e179651ef6
old-commit-hash: 96f8e5158e
496 lines
14 KiB
Python
496 lines
14 KiB
Python
# python library to interface with panda
|
|
from __future__ import print_function
|
|
import binascii
|
|
import struct
|
|
import hashlib
|
|
import socket
|
|
import usb1
|
|
import os
|
|
import time
|
|
import traceback
|
|
from dfu import PandaDFU
|
|
from esptool import ESPROM, CesantaFlasher
|
|
from flash_release import flash_release
|
|
from update import ensure_st_up_to_date
|
|
from serial import PandaSerial
|
|
|
|
__version__ = '0.0.6'
|
|
|
|
BASEDIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "../")
|
|
|
|
DEBUG = os.getenv("PANDADEBUG") is not None
|
|
|
|
# *** wifi mode ***
|
|
|
|
def build_st(target, mkfile="Makefile"):
|
|
from panda import BASEDIR
|
|
assert(os.system('cd %s && make -f %s clean && make -f %s %s >/dev/null' % (os.path.join(BASEDIR, "board"), mkfile, mkfile, target)) == 0)
|
|
|
|
def parse_can_buffer(dat):
|
|
ret = []
|
|
for j in range(0, len(dat), 0x10):
|
|
ddat = dat[j:j+0x10]
|
|
f1, f2 = struct.unpack("II", ddat[0:8])
|
|
extended = 4
|
|
if f1 & extended:
|
|
address = f1 >> 3
|
|
else:
|
|
address = f1 >> 21
|
|
dddat = ddat[8:8+(f2&0xF)]
|
|
if DEBUG:
|
|
print(" R %x: %s" % (address, str(dddat).encode("hex")))
|
|
ret.append((address, f2>>16, dddat, (f2>>4)&0xFF))
|
|
return ret
|
|
|
|
class PandaWifiStreaming(object):
|
|
def __init__(self, ip="192.168.0.10", port=1338):
|
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
self.sock.setblocking(0)
|
|
self.ip = ip
|
|
self.port = port
|
|
self.kick()
|
|
|
|
def kick(self):
|
|
# must be called at least every 5 seconds
|
|
self.sock.sendto("hello", (self.ip, self.port))
|
|
|
|
def can_recv(self):
|
|
ret = []
|
|
while True:
|
|
try:
|
|
dat, addr = self.sock.recvfrom(0x200*0x10)
|
|
if addr == (self.ip, self.port):
|
|
ret += parse_can_buffer(dat)
|
|
except socket.error as e:
|
|
if e.errno != 35 and e.errno != 11:
|
|
traceback.print_exc()
|
|
break
|
|
return ret
|
|
|
|
# stupid tunneling of USB over wifi and SPI
|
|
class WifiHandle(object):
|
|
def __init__(self, ip="192.168.0.10", port=1337):
|
|
self.sock = socket.create_connection((ip, port))
|
|
|
|
def __recv(self):
|
|
ret = self.sock.recv(0x44)
|
|
length = struct.unpack("I", ret[0:4])[0]
|
|
return ret[4:4+length]
|
|
|
|
def controlWrite(self, request_type, request, value, index, data, timeout=0):
|
|
# ignore data in reply, panda doesn't use it
|
|
return self.controlRead(request_type, request, value, index, 0, timeout)
|
|
|
|
def controlRead(self, request_type, request, value, index, length, timeout=0):
|
|
self.sock.send(struct.pack("HHBBHHH", 0, 0, request_type, request, value, index, length))
|
|
return self.__recv()
|
|
|
|
def bulkWrite(self, endpoint, data, timeout=0):
|
|
if len(data) > 0x10:
|
|
raise ValueError("Data must not be longer than 0x10")
|
|
self.sock.send(struct.pack("HH", endpoint, len(data))+data)
|
|
self.__recv() # to /dev/null
|
|
|
|
def bulkRead(self, endpoint, length, timeout=0):
|
|
self.sock.send(struct.pack("HH", endpoint, 0))
|
|
return self.__recv()
|
|
|
|
def close(self):
|
|
self.sock.close()
|
|
|
|
# *** normal mode ***
|
|
|
|
class Panda(object):
|
|
SAFETY_NOOUTPUT = 0
|
|
SAFETY_HONDA = 1
|
|
SAFETY_TOYOTA = 2
|
|
SAFETY_TOYOTA_NOLIMITS = 0x1336
|
|
SAFETY_ALLOUTPUT = 0x1337
|
|
SAFETY_ELM327 = 0xE327
|
|
|
|
SERIAL_DEBUG = 0
|
|
SERIAL_ESP = 1
|
|
SERIAL_LIN1 = 2
|
|
SERIAL_LIN2 = 3
|
|
|
|
GMLAN_CAN2 = 1
|
|
GMLAN_CAN3 = 2
|
|
|
|
REQUEST_IN = usb1.ENDPOINT_IN | usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE
|
|
REQUEST_OUT = usb1.ENDPOINT_OUT | usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE
|
|
|
|
def __init__(self, serial=None, claim=True):
|
|
self._serial = serial
|
|
self._handle = None
|
|
self.connect(claim)
|
|
|
|
def close(self):
|
|
self._handle.close()
|
|
self._handle = None
|
|
|
|
def connect(self, claim=True, wait=False):
|
|
if self._handle != None:
|
|
self.close()
|
|
|
|
if self._serial == "WIFI":
|
|
self._handle = WifiHandle()
|
|
print("opening WIFI device")
|
|
self.wifi = True
|
|
else:
|
|
context = usb1.USBContext()
|
|
self._handle = None
|
|
self.wifi = False
|
|
|
|
while 1:
|
|
try:
|
|
for device in context.getDeviceList(skip_on_error=True):
|
|
#print(device)
|
|
if device.getVendorID() == 0xbbaa and device.getProductID() in [0xddcc, 0xddee]:
|
|
try:
|
|
this_serial = device.getSerialNumber()
|
|
except Exception:
|
|
continue
|
|
if self._serial is None or this_serial == self._serial:
|
|
self._serial = this_serial
|
|
print("opening device", self._serial, hex(device.getProductID()))
|
|
self.bootstub = device.getProductID() == 0xddee
|
|
self.legacy = (device.getbcdDevice() != 0x2300)
|
|
self._handle = device.open()
|
|
if claim:
|
|
self._handle.claimInterface(0)
|
|
#self._handle.setInterfaceAltSetting(0, 0) #Issue in USB stack
|
|
break
|
|
except Exception as e:
|
|
print("exception", e)
|
|
traceback.print_exc()
|
|
if wait == False or self._handle != None:
|
|
break
|
|
assert(self._handle != None)
|
|
print("connected")
|
|
|
|
def reset(self, enter_bootstub=False, enter_bootloader=False):
|
|
# reset
|
|
try:
|
|
if enter_bootloader:
|
|
self._handle.controlWrite(Panda.REQUEST_IN, 0xd1, 0, 0, b'')
|
|
else:
|
|
if enter_bootstub:
|
|
self._handle.controlWrite(Panda.REQUEST_IN, 0xd1, 1, 0, b'')
|
|
else:
|
|
self._handle.controlWrite(Panda.REQUEST_IN, 0xd8, 0, 0, b'')
|
|
except Exception:
|
|
pass
|
|
if not enter_bootloader:
|
|
self.close()
|
|
time.sleep(1.0)
|
|
success = False
|
|
# wait up to 15 seconds
|
|
for i in range(0, 15):
|
|
try:
|
|
self.connect()
|
|
success = True
|
|
break
|
|
except Exception:
|
|
print("reconnecting is taking %d seconds..." % (i+1))
|
|
try:
|
|
dfu = PandaDFU(PandaDFU.st_serial_to_dfu_serial(self._serial))
|
|
dfu.recover()
|
|
except Exception:
|
|
pass
|
|
time.sleep(1.0)
|
|
if not success:
|
|
raise Exception("reset failed")
|
|
|
|
def flash(self, fn=None, code=None):
|
|
if not self.bootstub:
|
|
self.reset(enter_bootstub=True)
|
|
assert(self.bootstub)
|
|
|
|
if fn is None and code is None:
|
|
if self.legacy:
|
|
fn = "obj/comma.bin"
|
|
print("building legacy st code")
|
|
build_st(fn, "Makefile.legacy")
|
|
else:
|
|
fn = "obj/panda.bin"
|
|
print("building panda st code")
|
|
build_st(fn)
|
|
fn = os.path.join(BASEDIR, "board", fn)
|
|
|
|
if code is None:
|
|
with open(fn) as f:
|
|
code = f.read()
|
|
|
|
# get version
|
|
print("flash: version is "+self.get_version())
|
|
|
|
# confirm flasher is present
|
|
fr = self._handle.controlRead(Panda.REQUEST_IN, 0xb0, 0, 0, 0xc)
|
|
assert fr[4:8] == "\xde\xad\xd0\x0d"
|
|
|
|
# unlock flash
|
|
print("flash: unlocking")
|
|
self._handle.controlWrite(Panda.REQUEST_IN, 0xb1, 0, 0, b'')
|
|
|
|
# erase sectors 1 and 2
|
|
print("flash: erasing")
|
|
self._handle.controlWrite(Panda.REQUEST_IN, 0xb2, 1, 0, b'')
|
|
self._handle.controlWrite(Panda.REQUEST_IN, 0xb2, 2, 0, b'')
|
|
|
|
# flash over EP2
|
|
STEP = 0x10
|
|
print("flash: flashing")
|
|
for i in range(0, len(code), STEP):
|
|
self._handle.bulkWrite(2, code[i:i+STEP])
|
|
|
|
# reset
|
|
print("flash: resetting")
|
|
self.reset()
|
|
|
|
def recover(self):
|
|
self.reset(enter_bootloader=True)
|
|
while len(PandaDFU.list()) == 0:
|
|
print("waiting for DFU...")
|
|
time.sleep(0.1)
|
|
|
|
dfu = PandaDFU(PandaDFU.st_serial_to_dfu_serial(self._serial))
|
|
dfu.recover()
|
|
|
|
# reflash after recover
|
|
self.connect(True, True)
|
|
self.flash()
|
|
|
|
@staticmethod
|
|
def flash_ota_st():
|
|
ret = os.system("cd %s && make clean && make ota" % (os.path.join(BASEDIR, "board")))
|
|
time.sleep(1)
|
|
return ret==0
|
|
|
|
@staticmethod
|
|
def flash_ota_wifi():
|
|
ret = os.system("cd %s && make clean && make ota" % (os.path.join(BASEDIR, "boardesp")))
|
|
time.sleep(1)
|
|
return ret==0
|
|
|
|
@staticmethod
|
|
def list():
|
|
context = usb1.USBContext()
|
|
ret = []
|
|
try:
|
|
for device in context.getDeviceList(skip_on_error=True):
|
|
if device.getVendorID() == 0xbbaa and device.getProductID() in [0xddcc, 0xddee]:
|
|
try:
|
|
ret.append(device.getSerialNumber())
|
|
except Exception:
|
|
continue
|
|
except Exception:
|
|
pass
|
|
# TODO: detect if this is real
|
|
#ret += ["WIFI"]
|
|
return ret
|
|
|
|
def call_control_api(self, msg):
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, msg, 0, 0, b'')
|
|
|
|
# ******************* health *******************
|
|
|
|
def health(self):
|
|
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xd2, 0, 0, 13)
|
|
a = struct.unpack("IIBBBBB", dat)
|
|
return {"voltage": a[0], "current": a[1],
|
|
"started": a[2], "controls_allowed": a[3],
|
|
"gas_interceptor_detected": a[4],
|
|
"started_signal_detected": a[5],
|
|
"started_alt": a[6]}
|
|
|
|
# ******************* control *******************
|
|
|
|
def enter_bootloader(self):
|
|
try:
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xd1, 0, 0, b'')
|
|
except Exception as e:
|
|
print(e)
|
|
pass
|
|
|
|
def get_version(self):
|
|
return self._handle.controlRead(Panda.REQUEST_IN, 0xd6, 0, 0, 0x40)
|
|
|
|
def get_serial(self):
|
|
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xd0, 0, 0, 0x20)
|
|
hashsig, calc_hash = dat[0x1c:], hashlib.sha1(dat[0:0x1c]).digest()[0:4]
|
|
assert(hashsig == calc_hash)
|
|
return [dat[0:0x10], dat[0x10:0x10+10]]
|
|
|
|
def get_secret(self):
|
|
return self._handle.controlRead(Panda.REQUEST_IN, 0xd0, 1, 0, 0x10)
|
|
|
|
# ******************* configuration *******************
|
|
|
|
def set_usb_power(self, on):
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe6, int(on), 0, b'')
|
|
|
|
def set_esp_power(self, on):
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xd9, int(on), 0, b'')
|
|
|
|
def esp_reset(self, bootmode=0):
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xda, int(bootmode), 0, b'')
|
|
time.sleep(0.2)
|
|
|
|
def set_safety_mode(self, mode=SAFETY_NOOUTPUT):
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xdc, mode, 0, b'')
|
|
|
|
def set_can_forwarding(self, from_bus, to_bus):
|
|
# TODO: This feature may not work correctly with saturated buses
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xdd, from_bus, to_bus, b'')
|
|
|
|
def set_gmlan(self, bus=2):
|
|
if bus is None:
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xdb, 0, 0, b'')
|
|
elif bus in [Panda.GMLAN_CAN2, Panda.GMLAN_CAN3]:
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xdb, 1, bus, b'')
|
|
|
|
def set_can_loopback(self, enable):
|
|
# set can loopback mode for all buses
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe5, int(enable), 0, b'')
|
|
|
|
def set_can_speed_kbps(self, bus, speed):
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xde, bus, int(speed*10), b'')
|
|
|
|
def set_uart_baud(self, uart, rate):
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe4, uart, rate/300, b'')
|
|
|
|
def set_uart_parity(self, uart, parity):
|
|
# parity, 0=off, 1=even, 2=odd
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe2, uart, parity, b'')
|
|
|
|
def set_uart_callback(self, uart, install):
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe3, uart, int(install), b'')
|
|
|
|
# ******************* can *******************
|
|
|
|
def can_send_many(self, arr):
|
|
snds = []
|
|
transmit = 1
|
|
extended = 4
|
|
for addr, _, dat, bus in arr:
|
|
assert len(dat) <= 8
|
|
if DEBUG:
|
|
print(" W %x: %s" % (addr, dat.encode("hex")))
|
|
if addr >= 0x800:
|
|
rir = (addr << 3) | transmit | extended
|
|
else:
|
|
rir = (addr << 21) | transmit
|
|
snd = struct.pack("II", rir, len(dat) | (bus << 4)) + dat
|
|
snd = snd.ljust(0x10, b'\x00')
|
|
snds.append(snd)
|
|
|
|
while True:
|
|
try:
|
|
#print("DAT: %s"%b''.join(snds).__repr__())
|
|
if self.wifi:
|
|
for s in snds:
|
|
self._handle.bulkWrite(3, s)
|
|
else:
|
|
self._handle.bulkWrite(3, b''.join(snds))
|
|
break
|
|
except (usb1.USBErrorIO, usb1.USBErrorOverflow):
|
|
print("CAN: BAD SEND MANY, RETRYING")
|
|
|
|
def can_send(self, addr, dat, bus):
|
|
self.can_send_many([[addr, None, dat, bus]])
|
|
|
|
def can_recv(self):
|
|
dat = bytearray()
|
|
while True:
|
|
try:
|
|
dat = self._handle.bulkRead(1, 0x10*256)
|
|
break
|
|
except (usb1.USBErrorIO, usb1.USBErrorOverflow):
|
|
print("CAN: BAD RECV, RETRYING")
|
|
return parse_can_buffer(dat)
|
|
|
|
def can_clear(self, bus):
|
|
"""Clears all messages from the specified internal CAN ringbuffer as
|
|
though it were drained.
|
|
|
|
Args:
|
|
bus (int): can bus number to clear a tx queue, or 0xFFFF to clear the
|
|
global can rx queue.
|
|
|
|
"""
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf1, bus, 0, b'')
|
|
|
|
# ******************* serial *******************
|
|
|
|
def serial_read(self, port_number):
|
|
ret = []
|
|
while 1:
|
|
lret = bytes(self._handle.controlRead(Panda.REQUEST_IN, 0xe0, port_number, 0, 0x40))
|
|
if len(lret) == 0:
|
|
break
|
|
ret.append(lret)
|
|
return b''.join(ret)
|
|
|
|
def serial_write(self, port_number, ln):
|
|
ret = 0
|
|
for i in range(0, len(ln), 0x20):
|
|
ret += self._handle.bulkWrite(2, struct.pack("B", port_number) + ln[i:i+0x20])
|
|
return ret
|
|
|
|
def serial_clear(self, port_number):
|
|
"""Clears all messages (tx and rx) from the specified internal uart
|
|
ringbuffer as though it were drained.
|
|
|
|
Args:
|
|
port_number (int): port number of the uart to clear.
|
|
|
|
"""
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf2, port_number, 0, b'')
|
|
|
|
# ******************* kline *******************
|
|
|
|
# pulse low for wakeup
|
|
def kline_wakeup(self):
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf0, 0, 0, b'')
|
|
|
|
def kline_drain(self, bus=2):
|
|
# drain buffer
|
|
bret = bytearray()
|
|
while True:
|
|
ret = self._handle.controlRead(Panda.REQUEST_IN, 0xe0, bus, 0, 0x40)
|
|
if len(ret) == 0:
|
|
break
|
|
bret += ret
|
|
return bytes(bret)
|
|
|
|
def kline_ll_recv(self, cnt, bus=2):
|
|
echo = bytearray()
|
|
while len(echo) != cnt:
|
|
echo += self._handle.controlRead(Panda.REQUEST_OUT, 0xe0, bus, 0, cnt-len(echo))
|
|
return echo
|
|
|
|
def kline_send(self, x, bus=2, checksum=True):
|
|
def get_checksum(dat):
|
|
result = 0
|
|
result += sum(map(ord, dat)) if isinstance(b'dat', str) else sum(dat)
|
|
return struct.pack("B", result % 0x100)
|
|
|
|
self.kline_drain(bus=bus)
|
|
if checksum:
|
|
x += get_checksum(x)
|
|
for i in range(0, len(x), 0xf):
|
|
ts = x[i:i+0xf]
|
|
self._handle.bulkWrite(2, chr(bus).encode()+ts)
|
|
echo = self.kline_ll_recv(len(ts), bus=bus)
|
|
if echo != ts:
|
|
print("**** ECHO ERROR %d ****" % i)
|
|
print(binascii.hexlify(echo))
|
|
print(binascii.hexlify(ts))
|
|
assert echo == ts
|
|
|
|
def kline_recv(self, bus=2):
|
|
msg = self.kline_ll_recv(2, bus=bus)
|
|
msg += self.kline_ll_recv(ord(msg[1])-2, bus=bus)
|
|
return msg
|
|
|