dragonpilot 2022-12-30T07:51:42 for EON/C2

version: dragonpilot v0.9.1 beta for EON/C2
date: 2022-12-30T07:51:42
dp-dev(priv2) master commit: 4e8e00606410c2bbe03b93fc89a91caec37d1f3a
This commit is contained in:
Dragonpilot Team
2022-12-30 07:48:08 +00:00
parent 398aeb927b
commit 1f2e3aa8b6
179 changed files with 5786 additions and 7459 deletions

View File

@@ -29,17 +29,18 @@ BASEDIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "../")
DEBUG = os.getenv("PANDADEBUG") is not None
CAN_TRANSACTION_MAGIC = struct.pack("<I", 0x43414E2F)
USBPACKET_MAX_SIZE = 0x40
CANPACKET_HEAD_SIZE = 0x5
DLC_TO_LEN = [0, 1, 2, 3, 4, 5, 6, 7, 8, 12, 16, 20, 24, 32, 48, 64]
LEN_TO_DLC = {length: dlc for (dlc, length) in enumerate(DLC_TO_LEN)}
def pack_can_buffer(arr):
snds = [b'']
idx = 0
snds = [CAN_TRANSACTION_MAGIC]
for address, _, dat, bus in arr:
assert len(dat) in LEN_TO_DLC
if DEBUG:
print(f" W 0x{address:x}: 0x{dat.hex()}")
#logging.debug(" W 0x%x: 0x%s", address, dat.hex())
extended = 1 if address >= 0x800 else 0
data_len_code = LEN_TO_DLC[len(dat)]
header = bytearray(5)
@@ -49,57 +50,50 @@ def pack_can_buffer(arr):
header[2] = (word_4b >> 8) & 0xFF
header[3] = (word_4b >> 16) & 0xFF
header[4] = (word_4b >> 24) & 0xFF
snds[idx] += header + dat
if len(snds[idx]) > 256: # Limit chunks to 256 bytes
snds.append(b'')
idx += 1
# Apply counter to each 64 byte packet
for idx in range(len(snds)):
tx = b''
counter = 0
for i in range (0, len(snds[idx]), 63):
tx += bytes([counter]) + snds[idx][i:i+63]
counter += 1
snds[idx] = tx
snds[-1] += header + dat
if len(snds[-1]) > 256: # Limit chunks to 256 bytes
snds.append(CAN_TRANSACTION_MAGIC)
return snds
def unpack_can_buffer(dat):
ret = []
counter = 0
tail = bytearray()
for i in range(0, len(dat), 64):
if counter != dat[i]:
print("CAN: LOST RECV PACKET COUNTER")
break
counter+=1
chunk = tail + dat[i+1:i+64]
tail = bytearray()
pos = 0
while pos<len(chunk):
data_len = DLC_TO_LEN[(chunk[pos]>>4)]
pckt_len = CANPACKET_HEAD_SIZE + data_len
if pckt_len <= len(chunk[pos:]):
header = chunk[pos:pos+CANPACKET_HEAD_SIZE]
if len(header) < 5:
print("CAN: MALFORMED USB RECV PACKET")
break
bus = (header[0] >> 1) & 0x7
address = (header[4] << 24 | header[3] << 16 | header[2] << 8 | header[1]) >> 3
returned = (header[1] >> 1) & 0x1
rejected = header[1] & 0x1
data = chunk[pos + CANPACKET_HEAD_SIZE:pos + CANPACKET_HEAD_SIZE + data_len]
if returned:
bus += 128
if rejected:
bus += 192
if DEBUG:
print(f" R 0x{address:x}: 0x{data.hex()}")
ret.append((address, 0, data, bus))
pos += pckt_len
else:
tail = chunk[pos:]
break
if len(dat) < len(CAN_TRANSACTION_MAGIC):
return ret
if dat[:len(CAN_TRANSACTION_MAGIC)] != CAN_TRANSACTION_MAGIC:
logging.error("CAN: recv didn't start with magic")
return ret
dat = dat[len(CAN_TRANSACTION_MAGIC):]
while len(dat) >= CANPACKET_HEAD_SIZE:
data_len = DLC_TO_LEN[(dat[0]>>4)]
header = dat[:CANPACKET_HEAD_SIZE]
dat = dat[CANPACKET_HEAD_SIZE:]
bus = (header[0] >> 1) & 0x7
address = (header[4] << 24 | header[3] << 16 | header[2] << 8 | header[1]) >> 3
if (header[1] >> 1) & 0x1:
# returned
bus += 128
if header[1] & 0x1:
# rejected
bus += 192
data = dat[:data_len]
dat = dat[data_len:]
#logging.debug(" R 0x%x: 0x%s", address, data.hex())
ret.append((address, 0, data, bus))
if len(dat) > 0:
logging.error("CAN: malformed packet. leftover data")
return ret
def ensure_health_packet_version(fn):
@@ -167,6 +161,8 @@ class Panda:
SAFETY_FAW = 26
SAFETY_BODY = 27
SAFETY_HYUNDAI_CANFD = 28
SAFETY_VOLVO_C1 = 29
SAFETY_VOLVO_EUCD = 30
SERIAL_DEBUG = 0
SERIAL_ESP = 1
@@ -191,7 +187,7 @@ class Panda:
HW_TYPE_RED_PANDA_V2 = b'\x08'
HW_TYPE_TRES = b'\x09'
CAN_PACKET_VERSION = 2
CAN_PACKET_VERSION = 3
HEALTH_PACKET_VERSION = 11
CAN_HEALTH_PACKET_VERSION = 3
HEALTH_STRUCT = struct.Struct("<IIIIIIIIIBBBBBBHBBBHfBBB")
@@ -222,6 +218,7 @@ class Panda:
FLAG_HYUNDAI_CAMERA_SCC = 8
FLAG_HYUNDAI_CANFD_HDA2 = 16
FLAG_HYUNDAI_CANFD_ALT_BUTTONS = 32
FLAG_HYUNDAI_ALT_LIMITS = 64
FLAG_TESLA_POWERTRAIN = 1
FLAG_TESLA_LONG_CONTROL = 2

View File

@@ -20,10 +20,10 @@ if os.path.exists('/data/params/d/dp_atl'):
atl_enabled = True
sspoof_enabled = False
if os.path.exists('/data/params/d/dp_sspoof'):
with open('/data/params/d/dp_sspoof') as f:
if (int(f.read().strip())) != 0:
sspoof_enabled = True
# if os.path.exists('/data/params/d/dp_sspoof'):
# with open('/data/params/d/dp_sspoof') as f:
# if (int(f.read().strip())) != 0:
# sspoof_enabled = True
if atl_enabled and sspoof_enabled and os.path.exists(TESTING_SSPOOF_FW_FN):
DEFAULT_FW_FN = TESTING_SSPOOF_FW_FN