mirror of
https://github.com/dragonpilot/dragonpilot.git
synced 2026-02-19 13:34:01 +08:00
Merge panda subtree
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
# python library to interface with panda
|
||||
from __future__ import print_function
|
||||
|
||||
import binascii
|
||||
import struct
|
||||
import hashlib
|
||||
@@ -9,12 +9,12 @@ import os
|
||||
import time
|
||||
import traceback
|
||||
import subprocess
|
||||
from dfu import PandaDFU
|
||||
from esptool import ESPROM, CesantaFlasher
|
||||
from flash_release import flash_release
|
||||
from update import ensure_st_up_to_date
|
||||
from serial import PandaSerial
|
||||
from isotp import isotp_send, isotp_recv
|
||||
from .dfu import PandaDFU
|
||||
from .esptool import ESPROM, CesantaFlasher
|
||||
from .flash_release import flash_release
|
||||
from .update import ensure_st_up_to_date
|
||||
from .serial import PandaSerial
|
||||
from .isotp import isotp_send, isotp_recv
|
||||
|
||||
__version__ = '0.0.9'
|
||||
|
||||
@@ -23,7 +23,6 @@ BASEDIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "../")
|
||||
DEBUG = os.getenv("PANDADEBUG") is not None
|
||||
|
||||
# *** wifi mode ***
|
||||
|
||||
def build_st(target, mkfile="Makefile"):
|
||||
from panda import BASEDIR
|
||||
cmd = 'cd %s && make -f %s clean && make -f %s %s >/dev/null' % (os.path.join(BASEDIR, "board"), mkfile, mkfile, target)
|
||||
@@ -46,7 +45,7 @@ def parse_can_buffer(dat):
|
||||
address = f1 >> 21
|
||||
dddat = ddat[8:8+(f2&0xF)]
|
||||
if DEBUG:
|
||||
print(" R %x: %s" % (address, str(dddat).encode("hex")))
|
||||
print(" R %x: %s" % (address, binascii.hexlify(dddat)))
|
||||
ret.append((address, f2>>16, dddat, (f2>>4)&0xFF))
|
||||
return ret
|
||||
|
||||
@@ -109,20 +108,25 @@ class WifiHandle(object):
|
||||
# *** normal mode ***
|
||||
|
||||
class Panda(object):
|
||||
|
||||
# matches cereal.car.CarParams.SafetyModel
|
||||
SAFETY_NOOUTPUT = 0
|
||||
SAFETY_HONDA = 1
|
||||
SAFETY_TOYOTA = 2
|
||||
SAFETY_GM = 3
|
||||
SAFETY_HONDA_BOSCH = 4
|
||||
SAFETY_FORD = 5
|
||||
SAFETY_CADILLAC = 6
|
||||
SAFETY_HYUNDAI = 7
|
||||
SAFETY_TESLA = 8
|
||||
SAFETY_ELM327 = 3
|
||||
SAFETY_GM = 4
|
||||
SAFETY_HONDA_BOSCH = 5
|
||||
SAFETY_FORD = 6
|
||||
SAFETY_CADILLAC = 7
|
||||
SAFETY_HYUNDAI = 8
|
||||
SAFETY_CHRYSLER = 9
|
||||
SAFETY_TOYOTA_IPAS = 0x1335
|
||||
SAFETY_TOYOTA_NOLIMITS = 0x1336
|
||||
SAFETY_ALLOUTPUT = 0x1337
|
||||
SAFETY_ELM327 = 0xE327
|
||||
SAFETY_TESLA = 10
|
||||
SAFETY_SUBARU = 11
|
||||
SAFETY_GM_PASSIVE = 12
|
||||
SAFETY_MAZDA = 13
|
||||
SAFETY_TOYOTA_IPAS = 16
|
||||
SAFETY_ALLOUTPUT = 17
|
||||
SAFETY_GM_ASCM = 18
|
||||
|
||||
SERIAL_DEBUG = 0
|
||||
SERIAL_ESP = 1
|
||||
@@ -135,11 +139,11 @@ class Panda(object):
|
||||
REQUEST_IN = usb1.ENDPOINT_IN | usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE
|
||||
REQUEST_OUT = usb1.ENDPOINT_OUT | usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE
|
||||
|
||||
HW_TYPE_UNKNOWN = '\x00'
|
||||
HW_TYPE_WHITE_PANDA = '\x01'
|
||||
HW_TYPE_GREY_PANDA = '\x02'
|
||||
HW_TYPE_BLACK_PANDA = '\x03'
|
||||
HW_TYPE_PEDAL = '\x04'
|
||||
HW_TYPE_UNKNOWN = b'\x00'
|
||||
HW_TYPE_WHITE_PANDA = b'\x01'
|
||||
HW_TYPE_GREY_PANDA = b'\x02'
|
||||
HW_TYPE_BLACK_PANDA = b'\x03'
|
||||
HW_TYPE_PEDAL = b'\x04'
|
||||
|
||||
def __init__(self, serial=None, claim=True):
|
||||
self._serial = serial
|
||||
@@ -232,7 +236,7 @@ class Panda(object):
|
||||
def flash_static(handle, code):
|
||||
# confirm flasher is present
|
||||
fr = handle.controlRead(Panda.REQUEST_IN, 0xb0, 0, 0, 0xc)
|
||||
assert fr[4:8] == "\xde\xad\xd0\x0d"
|
||||
assert fr[4:8] == b"\xde\xad\xd0\x0d"
|
||||
|
||||
# unlock flash
|
||||
print("flash: unlocking")
|
||||
@@ -274,7 +278,7 @@ class Panda(object):
|
||||
fn = os.path.join(BASEDIR, "board", fn)
|
||||
|
||||
if code is None:
|
||||
with open(fn) as f:
|
||||
with open(fn, "rb") as f:
|
||||
code = f.read()
|
||||
|
||||
# get version
|
||||
@@ -364,7 +368,7 @@ class Panda(object):
|
||||
pass
|
||||
|
||||
def get_version(self):
|
||||
return self._handle.controlRead(Panda.REQUEST_IN, 0xd6, 0, 0, 0x40)
|
||||
return self._handle.controlRead(Panda.REQUEST_IN, 0xd6, 0, 0, 0x40).decode('utf8')
|
||||
|
||||
def get_type(self):
|
||||
return self._handle.controlRead(Panda.REQUEST_IN, 0xc1, 0, 0, 0x40)
|
||||
@@ -429,7 +433,7 @@ class Panda(object):
|
||||
self._handle.controlWrite(Panda.REQUEST_OUT, 0xde, bus, int(speed*10), b'')
|
||||
|
||||
def set_uart_baud(self, uart, rate):
|
||||
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe4, uart, rate/300, b'')
|
||||
self._handle.controlWrite(Panda.REQUEST_OUT, 0xe4, uart, int(rate/300), b'')
|
||||
|
||||
def set_uart_parity(self, uart, parity):
|
||||
# parity, 0=off, 1=even, 2=odd
|
||||
@@ -447,7 +451,7 @@ class Panda(object):
|
||||
for addr, _, dat, bus in arr:
|
||||
assert len(dat) <= 8
|
||||
if DEBUG:
|
||||
print(" W %x: %s" % (addr, dat.encode("hex")))
|
||||
print(" W %x: %s" % (addr, binascii.hexlify(dat)))
|
||||
if addr >= 0x800:
|
||||
rir = (addr << 3) | transmit | extended
|
||||
else:
|
||||
@@ -479,7 +483,7 @@ class Panda(object):
|
||||
break
|
||||
except (usb1.USBErrorIO, usb1.USBErrorOverflow):
|
||||
print("CAN: BAD RECV, RETRYING")
|
||||
time.sleep(0.1)
|
||||
time.sleep(0.1)
|
||||
return parse_can_buffer(dat)
|
||||
|
||||
def can_clear(self, bus):
|
||||
@@ -546,7 +550,7 @@ class Panda(object):
|
||||
if len(ret) == 0:
|
||||
break
|
||||
elif DEBUG:
|
||||
print("kline drain: "+str(ret).encode("hex"))
|
||||
print("kline drain: " + binascii.hexlify(ret))
|
||||
bret += ret
|
||||
return bytes(bret)
|
||||
|
||||
@@ -555,7 +559,7 @@ class Panda(object):
|
||||
while len(echo) != cnt:
|
||||
ret = str(self._handle.controlRead(Panda.REQUEST_OUT, 0xe0, bus, 0, cnt-len(echo)))
|
||||
if DEBUG and len(ret) > 0:
|
||||
print("kline recv: "+ret.encode("hex"))
|
||||
print("kline recv: " + binascii.hexlify(ret))
|
||||
echo += ret
|
||||
return str(echo)
|
||||
|
||||
@@ -572,7 +576,7 @@ class Panda(object):
|
||||
for i in range(0, len(x), 0xf):
|
||||
ts = x[i:i+0xf]
|
||||
if DEBUG:
|
||||
print("kline send: "+ts.encode("hex"))
|
||||
print("kline send: " + binascii.hexlify(ts))
|
||||
self._handle.bulkWrite(2, chr(bus).encode()+ts)
|
||||
echo = self.kline_ll_recv(len(ts), bus=bus)
|
||||
if echo != ts:
|
||||
|
||||
Reference in New Issue
Block a user