mirror of
https://github.com/dragonpilot/dragonpilot.git
synced 2026-02-19 22:53:57 +08:00
b42db6d Merge pull request #82 from commaai/uart_dma fd68f86 smallr be99ffc ok that doesn't hurt i think a9f6bf0 this 8b7e849 working now 7fa4808 froze up, maybe thats the fix 1465aa4 ok, it's fixed 915cd84 ugh, ok, need that fd05376 comment out debug 37c5263 big fifo 497f069 dma is all critical, no interrupts 7c34afe minor change 743d244 high baud rate works 5d2a4ba v1.0.6 fbf1390 Toyota Safety: fix in input param 6c01d09 Toyota: less torque error allowance to meet Corolla acceptable behavior 07c01b2 Toyota safety: using input param 4410a59 add safety param support fc81fc1 uart dma in progress 65fb2b2 grey panda query, 1.0.5 f415c9a grey panda detection b68957e add pandadebug support b5e4962 leave msgs around in isotp 0acce2d add recvaddr support 3fc38f4 set bootmode with power d4c052a make that work 21f8195 fix panda serial write af74aa9 from python import git-subtree-dir: panda git-subtree-split: b42db6dc082fb13ef5ac63ed197a63e179651ef6
496 lines
14 KiB
Python
496 lines
14 KiB
Python
# python library to interface with panda
|
|
from __future__ import print_function
|
|
import binascii
|
|
import struct
|
|
import hashlib
|
|
import socket
|
|
import usb1
|
|
import os
|
|
import time
|
|
import traceback
|
|
from dfu import PandaDFU
|
|
from esptool import ESPROM, CesantaFlasher
|
|
from flash_release import flash_release
|
|
from update import ensure_st_up_to_date
|
|
from serial import PandaSerial
|
|
|
|
__version__ = '0.0.6'
|
|
|
|
BASEDIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "../")
|
|
|
|
DEBUG = os.getenv("PANDADEBUG") is not None
|
|
|
|
# *** wifi mode ***
|
|
|
|
def build_st(target, mkfile="Makefile"):
|
|
from panda import BASEDIR
|
|
assert(os.system('cd %s && make -f %s clean && make -f %s %s >/dev/null' % (os.path.join(BASEDIR, "board"), mkfile, mkfile, target)) == 0)
|
|
|
|
def parse_can_buffer(dat):
|
|
ret = []
|
|
for j in range(0, len(dat), 0x10):
|
|
ddat = dat[j:j+0x10]
|
|
f1, f2 = struct.unpack("II", ddat[0:8])
|
|
extended = 4
|
|
if f1 & extended:
|
|
address = f1 >> 3
|
|
else:
|
|
address = f1 >> 21
|
|
dddat = ddat[8:8+(f2&0xF)]
|
|
if DEBUG:
|
|
print(" R %x: %s" % (address, str(dddat).encode("hex")))
|
|
ret.append((address, f2>>16, dddat, (f2>>4)&0xFF))
|
|
return ret
|
|
|
|
class PandaWifiStreaming(object):
|
|
def __init__(self, ip="192.168.0.10", port=1338):
|
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
self.sock.setblocking(0)
|
|
self.ip = ip
|
|
self.port = port
|
|
self.kick()
|
|
|
|
def kick(self):
|
|
# must be called at least every 5 seconds
|
|
self.sock.sendto("hello", (self.ip, self.port))
|
|
|
|
def can_recv(self):
|
|
ret = []
|
|
while True:
|
|
try:
|
|
dat, addr = self.sock.recvfrom(0x200*0x10)
|
|
if addr == (self.ip, self.port):
|
|
ret += parse_can_buffer(dat)
|
|
except socket.error as e:
|
|
if e.errno != 35 and e.errno != 11:
|
|
traceback.print_exc()
|
|
break
|
|
return ret
|
|
|
|
# stupid tunneling of USB over wifi and SPI
|
|
class WifiHandle(object):
|
|
def __init__(self, ip="192.168.0.10", port=1337):
|
|
self.sock = socket.create_connection((ip, port))
|
|
|
|
def __recv(self):
|
|
ret = self.sock.recv(0x44)
|
|
length = struct.unpack("I", ret[0:4])[0]
|
|
return ret[4:4+length]
|
|
|
|
def controlWrite(self, request_type, request, value, index, data, timeout=0):
|
|
# ignore data in reply, panda doesn't use it
|
|
return self.controlRead(request_type, request, value, index, 0, timeout)
|
|
|
|
def controlRead(self, request_type, request, value, index, length, timeout=0):
|
|
self.sock.send(struct.pack("HHBBHHH", 0, 0, request_type, request, value, index, length))
|
|
return self.__recv()
|
|
|
|
def bulkWrite(self, endpoint, data, timeout=0):
|
|
if len(data) > 0x10:
|
|
raise ValueError("Data must not be longer than 0x10")
|
|
self.sock.send(struct.pack("HH", endpoint, len(data))+data)
|
|
self.__recv() # to /dev/null
|
|
|
|
def bulkRead(self, endpoint, length, timeout=0):
|
|
self.sock.send(struct.pack("HH", endpoint, 0))
|
|
return self.__recv()
|
|
|
|
def close(self):
|
|
self.sock.close()
|
|
|
|
# *** normal mode ***
|
|
|
|
class Panda(object):
|
|
SAFETY_NOOUTPUT = 0
|
|
SAFETY_HONDA = 1
|
|
SAFETY_TOYOTA = 2
|
|
SAFETY_TOYOTA_NOLIMITS = 0x1336
|
|
SAFETY_ALLOUTPUT = 0x1337
|
|
SAFETY_ELM327 = 0xE327
|
|
|
|
SERIAL_DEBUG = 0
|
|
SERIAL_ESP = 1
|
|
SERIAL_LIN1 = 2
|
|
SERIAL_LIN2 = 3
|
|
|
|
GMLAN_CAN2 = 1
|
|
GMLAN_CAN3 = 2
|
|
|
|
REQUEST_IN = usb1.ENDPOINT_IN | usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE
|
|
REQUEST_OUT = usb1.ENDPOINT_OUT | usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE
|
|
|
|
def __init__(self, serial=None, claim=True):
|
|
self._serial = serial
|
|
self._handle = None
|
|
self.connect(claim)
|
|
|
|
def close(self):
|
|
self._handle.close()
|
|
self._handle = None
|
|
|
|
def connect(self, claim=True, wait=False):
|
|
if self._handle != None:
|
|
self.close()
|
|
|
|
if self._serial == "WIFI":
|
|
self._handle = WifiHandle()
|
|
print("opening WIFI device")
|
|
self.wifi = True
|
|
else:
|
|
context = usb1.USBContext()
|
|
self._handle = None
|
|
self.wifi = False
|
|
|
|
while 1:
|
|
try:
|
|
for device in context.getDeviceList(skip_on_error=True):
|
|
#print(device)
|
|
if device.getVendorID() == 0xbbaa and device.getProductID() in [0xddcc, 0xddee]:
|
|
try:
|
|
this_serial = device.getSerialNumber()
|
|
except Exception:
|
|
continue
|
|
if self._serial is None or this_serial == self._serial:
|
|
self._serial = this_serial
|
|
print("opening device", self._serial, hex(device.getProductID()))
|
|
self.bootstub = device.getProductID() == 0xddee
|
|
self.legacy = (device.getbcdDevice() != 0x2300)
|
|
self._handle = device.open()
|
|
if claim:
|
|
self._handle.claimInterface(0)
|
|
#self._handle.setInterfaceAltSetting(0, 0) #Issue in USB stack
|
|
break
|
|
except Exception as e:
|
|
print("exception", e)
|
|
traceback.print_exc()
|
|
if wait == False or self._handle != None:
|
|
break
|
|
assert(self._handle != None)
|
|
print("connected")
|
|
|
|
def reset(self, enter_bootstub=False, enter_bootloader=False):
|
|
# reset
|
|
try:
|
|
if enter_bootloader:
|
|
self._handle.controlWrite(Panda.REQUEST_IN, 0xd1, 0, 0, b'')
|
|
else:
|
|
if enter_bootstub:
|
|
self._handle.controlWrite(Panda.REQUEST_IN, 0xd1, 1, 0, b'')
|
|
else:
|
|
self._handle.controlWrite(Panda.REQUEST_IN, 0xd8, 0, 0, b'')
|
|
except Exception:
|
|
pass
|
|
if not enter_bootloader:
|
|
self.close()
|
|
time.sleep(1.0)
|
|
success = False
|
|
# wait up to 15 seconds
|
|
for i in range(0, 15):
|
|
try:
|
|
self.connect()
|
|
success = True
|
|
break
|
|
except Exception:
|
|
print("reconnecting is taking %d seconds..." % (i+1))
|
|
try:
|
|
dfu = PandaDFU(PandaDFU.st_serial_to_dfu_serial(self._serial))
|
|
dfu.recover()
|
|
except Exception:
|
|
pass
|
|
time.sleep(1.0)
|
|
if not success:
|
|
raise Exception("reset failed")
|
|
|
|
def flash(self, fn=None, code=None):
|
|
if not self.bootstub:
|
|
self.reset(enter_bootstub=True)
|
|
assert(self.bootstub)
|
|
|
|
if fn is None and code is None:
|
|
if self.legacy:
|
|
fn = "obj/comma.bin"
|
|
print("building legacy st code")
|
|
build_st(fn, "Makefile.legacy")
|
|
else:
|
|
fn = "obj/panda.bin"
|
|
print("building panda st code")
|
|
build_st(fn)
|
|
fn = os.path.join(BASEDIR, "board", fn)
|
|
|
|
if code is None:
|
|
with open(fn) as f:
|
|
code = f.read()
|
|
|
|
# get version
|
|
print("flash: version is "+self.get_version())
|
|
|
|
# confirm flasher is present
|
|
fr = self._handle.controlRead(Panda.REQUEST_IN, 0xb0, 0, 0, 0xc)
|
|
assert fr[4:8] == "\xde\xad\xd0\x0d"
|
|
|
|
# unlock flash
|
|
print("flash: unlocking")
|
|
self._handle.controlWrite(Panda.REQUEST_IN, 0xb1, 0, 0, b'')
|
|
|
|
# erase sectors 1 and 2
|
|
print("flash: erasing")
|
|
self._handle.controlWrite(Panda.REQUEST_IN, 0xb2, 1, 0, b'')
|
|
self._handle.controlWrite(Panda.REQUEST_IN, 0xb2, 2, 0, b'')
|
|
|
|
# flash over EP2
|
|
STEP = 0x10
|
|
print("flash: flashing")
|
|
for i in range(0, len(code), STEP):
|
|
self._handle.bulkWrite(2, code[i:i+STEP])
|
|
|
|
# reset
|
|
print("flash: resetting")
|
|
self.reset()
|
|
|
|
def recover(self):
|
|
self.reset(enter_bootloader=True)
|
|
while len(PandaDFU.list()) == 0:
|
|
print("waiting for DFU...")
|
|
time.sleep(0.1)
|
|
|
|
dfu = PandaDFU(PandaDFU.st_serial_to_dfu_serial(self._serial))
|
|
dfu.recover()
|
|
|
|
# reflash after recover
|
|
self.connect(True, True)
|
|
self.flash()
|
|
|
|
@staticmethod
|
|
def flash_ota_st():
|
|
ret = os.system("cd %s && make clean && make ota" % (os.path.join(BASEDIR, "board")))
|
|
time.sleep(1)
|
|
return ret==0
|
|
|
|
@staticmethod
|
|
def flash_ota_wifi():
|
|
ret = os.system("cd %s && make clean && make ota" % (os.path.join(BASEDIR, "boardesp")))
|
|
time.sleep(1)
|
|
return ret==0
|
|
|
|
@staticmethod
|
|
def list():
|
|
context = usb1.USBContext()
|
|
ret = []
|
|
try:
|
|
for device in context.getDeviceList(skip_on_error=True):
|
|
if device.getVendorID() == 0xbbaa and device.getProductID() in [0xddcc, 0xddee]:
|
|
try:
|
|
ret.append(device.getSerialNumber())
|
|
except Exception:
|
|
continue
|
|
except Exception:
|
|
pass
|
|
# TODO: detect if this is real
|
|
#ret += ["WIFI"]
|
|
return ret
|
|
|
|
def call_control_api(self, msg):
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, msg, 0, 0, b'')
|
|
|
|
# ******************* health *******************
|
|
|
|
def health(self):
|
|
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xd2, 0, 0, 13)
|
|
a = struct.unpack("IIBBBBB", dat)
|
|
return {"voltage": a[0], "current": a[1],
|
|
"started": a[2], "controls_allowed": a[3],
|
|
"gas_interceptor_detected": a[4],
|
|
"started_signal_detected": a[5],
|
|
"started_alt": a[6]}
|
|
|
|
# ******************* control *******************
|
|
|
|
def enter_bootloader(self):
|
|
try:
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xd1, 0, 0, b'')
|
|
except Exception as e:
|
|
print(e)
|
|
pass
|
|
|
|
def get_version(self):
|
|
return self._handle.controlRead(Panda.REQUEST_IN, 0xd6, 0, 0, 0x40)
|
|
|
|
def get_serial(self):
|
|
dat = self._handle.controlRead(Panda.REQUEST_IN, 0xd0, 0, 0, 0x20)
|
|
hashsig, calc_hash = dat[0x1c:], hashlib.sha1(dat[0:0x1c]).digest()[0:4]
|
|
assert(hashsig == calc_hash)
|
|
return [dat[0:0x10], dat[0x10:0x10+10]]
|
|
|
|
def get_secret(self):
|
|
return self._handle.controlRead(Panda.REQUEST_IN, 0xd0, 1, 0, 0x10)
|
|
|
|
# ******************* configuration *******************
|
|
|
|
def set_usb_power(self, on):
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe6, int(on), 0, b'')
|
|
|
|
def set_esp_power(self, on):
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xd9, int(on), 0, b'')
|
|
|
|
def esp_reset(self, bootmode=0):
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xda, int(bootmode), 0, b'')
|
|
time.sleep(0.2)
|
|
|
|
def set_safety_mode(self, mode=SAFETY_NOOUTPUT):
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xdc, mode, 0, b'')
|
|
|
|
def set_can_forwarding(self, from_bus, to_bus):
|
|
# TODO: This feature may not work correctly with saturated buses
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xdd, from_bus, to_bus, b'')
|
|
|
|
def set_gmlan(self, bus=2):
|
|
if bus is None:
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xdb, 0, 0, b'')
|
|
elif bus in [Panda.GMLAN_CAN2, Panda.GMLAN_CAN3]:
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xdb, 1, bus, b'')
|
|
|
|
def set_can_loopback(self, enable):
|
|
# set can loopback mode for all buses
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe5, int(enable), 0, b'')
|
|
|
|
def set_can_speed_kbps(self, bus, speed):
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xde, bus, int(speed*10), b'')
|
|
|
|
def set_uart_baud(self, uart, rate):
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe4, uart, rate/300, b'')
|
|
|
|
def set_uart_parity(self, uart, parity):
|
|
# parity, 0=off, 1=even, 2=odd
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe2, uart, parity, b'')
|
|
|
|
def set_uart_callback(self, uart, install):
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe3, uart, int(install), b'')
|
|
|
|
# ******************* can *******************
|
|
|
|
def can_send_many(self, arr):
|
|
snds = []
|
|
transmit = 1
|
|
extended = 4
|
|
for addr, _, dat, bus in arr:
|
|
assert len(dat) <= 8
|
|
if DEBUG:
|
|
print(" W %x: %s" % (addr, dat.encode("hex")))
|
|
if addr >= 0x800:
|
|
rir = (addr << 3) | transmit | extended
|
|
else:
|
|
rir = (addr << 21) | transmit
|
|
snd = struct.pack("II", rir, len(dat) | (bus << 4)) + dat
|
|
snd = snd.ljust(0x10, b'\x00')
|
|
snds.append(snd)
|
|
|
|
while True:
|
|
try:
|
|
#print("DAT: %s"%b''.join(snds).__repr__())
|
|
if self.wifi:
|
|
for s in snds:
|
|
self._handle.bulkWrite(3, s)
|
|
else:
|
|
self._handle.bulkWrite(3, b''.join(snds))
|
|
break
|
|
except (usb1.USBErrorIO, usb1.USBErrorOverflow):
|
|
print("CAN: BAD SEND MANY, RETRYING")
|
|
|
|
def can_send(self, addr, dat, bus):
|
|
self.can_send_many([[addr, None, dat, bus]])
|
|
|
|
def can_recv(self):
|
|
dat = bytearray()
|
|
while True:
|
|
try:
|
|
dat = self._handle.bulkRead(1, 0x10*256)
|
|
break
|
|
except (usb1.USBErrorIO, usb1.USBErrorOverflow):
|
|
print("CAN: BAD RECV, RETRYING")
|
|
return parse_can_buffer(dat)
|
|
|
|
def can_clear(self, bus):
|
|
"""Clears all messages from the specified internal CAN ringbuffer as
|
|
though it were drained.
|
|
|
|
Args:
|
|
bus (int): can bus number to clear a tx queue, or 0xFFFF to clear the
|
|
global can rx queue.
|
|
|
|
"""
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf1, bus, 0, b'')
|
|
|
|
# ******************* serial *******************
|
|
|
|
def serial_read(self, port_number):
|
|
ret = []
|
|
while 1:
|
|
lret = bytes(self._handle.controlRead(Panda.REQUEST_IN, 0xe0, port_number, 0, 0x40))
|
|
if len(lret) == 0:
|
|
break
|
|
ret.append(lret)
|
|
return b''.join(ret)
|
|
|
|
def serial_write(self, port_number, ln):
|
|
ret = 0
|
|
for i in range(0, len(ln), 0x20):
|
|
ret += self._handle.bulkWrite(2, struct.pack("B", port_number) + ln[i:i+0x20])
|
|
return ret
|
|
|
|
def serial_clear(self, port_number):
|
|
"""Clears all messages (tx and rx) from the specified internal uart
|
|
ringbuffer as though it were drained.
|
|
|
|
Args:
|
|
port_number (int): port number of the uart to clear.
|
|
|
|
"""
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf2, port_number, 0, b'')
|
|
|
|
# ******************* kline *******************
|
|
|
|
# pulse low for wakeup
|
|
def kline_wakeup(self):
|
|
self._handle.controlWrite(Panda.REQUEST_OUT, 0xf0, 0, 0, b'')
|
|
|
|
def kline_drain(self, bus=2):
|
|
# drain buffer
|
|
bret = bytearray()
|
|
while True:
|
|
ret = self._handle.controlRead(Panda.REQUEST_IN, 0xe0, bus, 0, 0x40)
|
|
if len(ret) == 0:
|
|
break
|
|
bret += ret
|
|
return bytes(bret)
|
|
|
|
def kline_ll_recv(self, cnt, bus=2):
|
|
echo = bytearray()
|
|
while len(echo) != cnt:
|
|
echo += self._handle.controlRead(Panda.REQUEST_OUT, 0xe0, bus, 0, cnt-len(echo))
|
|
return echo
|
|
|
|
def kline_send(self, x, bus=2, checksum=True):
|
|
def get_checksum(dat):
|
|
result = 0
|
|
result += sum(map(ord, dat)) if isinstance(b'dat', str) else sum(dat)
|
|
return struct.pack("B", result % 0x100)
|
|
|
|
self.kline_drain(bus=bus)
|
|
if checksum:
|
|
x += get_checksum(x)
|
|
for i in range(0, len(x), 0xf):
|
|
ts = x[i:i+0xf]
|
|
self._handle.bulkWrite(2, chr(bus).encode()+ts)
|
|
echo = self.kline_ll_recv(len(ts), bus=bus)
|
|
if echo != ts:
|
|
print("**** ECHO ERROR %d ****" % i)
|
|
print(binascii.hexlify(echo))
|
|
print(binascii.hexlify(ts))
|
|
assert echo == ts
|
|
|
|
def kline_recv(self, bus=2):
|
|
msg = self.kline_ll_recv(2, bus=bus)
|
|
msg += self.kline_ll_recv(ord(msg[1])-2, bus=bus)
|
|
return msg
|
|
|