uds: no need for threads if you always drain rx

This commit is contained in:
Greg Hogan
2019-11-12 18:52:41 -08:00
parent 91b7c5bb77
commit 68c39fb3e2

View File

@@ -1,10 +1,8 @@
#!/usr/bin/env python3
import time
import struct
from typing import NamedTuple, List
from typing import Callable, NamedTuple, Tuple, List
from enum import IntEnum
from queue import Queue, Empty
from threading import Thread
from binascii import hexlify
class SERVICE_TYPE(IntEnum):
@@ -271,14 +269,50 @@ _negative_response_codes = {
0x93: 'voltage too low',
}
class CanClient():
def __init__(self, can_send: Callable[[Tuple[int, bytes, int]], None], can_recv: Callable[[], List[Tuple[int, int, bytes, int]]], tx_addr: int, rx_addrs: int, bus: int, debug: bool=False):
self.tx = can_send
self.rx = can_recv
self.tx_addr = tx_addr
self.rx_addrs = rx_addrs
self.bus = bus
self.debug = debug
def recv(self, drain=False) -> List[bytes]:
msg_array = []
while True:
msgs = self.rx()
if drain:
if self.debug: print("CAN-RX: drain - {}".format(len(msgs)))
else:
for rx_addr, rx_ts, rx_data, rx_bus in msgs or []:
if rx_bus == self.bus and rx_addr in self.rx_addrs and len(rx_data) > 0:
if self.debug: print("CAN-RX: {} - {}".format(hex(rx_addr), hexlify(rx_data)))
msg_array.append(rx_data)
# break when non-full buffer is processed
if len(msgs) < 254:
return msg_array
def send(self, msgs: List[bytes], delay: float=0) -> None:
first = True
for msg in msgs:
if not first and delay:
if self.debug: print(f"CAN-TX: delay - {delay}")
time.sleep(delay)
if self.debug: print("CAN-TX: {} - {}".format(hex(self.tx_addr), hexlify(msg)))
self.tx(self.tx_addr, msg, self.bus)
first = False
class IsoTpMessage():
def __init__(self, can_tx_queue: Queue, can_rx_queue: Queue, timeout: float, debug: bool=False):
self.can_tx_queue = can_tx_queue
self.can_rx_queue = can_rx_queue
def __init__(self, can_client: CanClient, timeout: float=1, debug: bool=False):
self._can_client = can_client
self.timeout = timeout
self.debug = debug
def send(self, dat: bytes) -> None:
# throw away any stale data
self._can_client.recv(drain=True)
self.tx_dat = dat
self.tx_len = len(dat)
self.tx_idx = 0
@@ -297,7 +331,7 @@ class IsoTpMessage():
# first frame (send first 6 bytes)
if self.debug: print("ISO-TP: TX - first frame")
msg = (struct.pack("!H", 0x1000 | self.tx_len) + self.tx_dat[:6]).ljust(8, b"\x00")
self.can_tx_queue.put(msg)
self._can_client.send([msg])
def recv(self) -> bytes:
self.rx_dat = b""
@@ -305,19 +339,19 @@ class IsoTpMessage():
self.rx_idx = 0
self.rx_done = False
start_time = time.time()
try:
while True:
self._isotp_rx_next()
if self.tx_done and self.rx_done:
return self.rx_dat
except Empty:
raise MessageTimeoutError("timeout waiting for response")
for msg in self._can_client.recv():
self._isotp_rx_next(msg)
if self.tx_done and self.rx_done:
return self.rx_dat
if time.time() - start_time > self.timeout:
raise MessageTimeoutError("timeout waiting for response")
finally:
if self.debug: print(f"ISO-TP: RESPONSE - {hexlify(self.rx_dat)}")
def _isotp_rx_next(self) -> None:
rx_data = self.can_rx_queue.get(block=True, timeout=self.timeout)
def _isotp_rx_next(self, rx_data: bytes) -> None:
# single rx_frame
if rx_data[0] >> 4 == 0x0:
self.rx_len = rx_data[0] & 0xFF
@@ -337,9 +371,9 @@ class IsoTpMessage():
if self.debug: print(f"ISO-TP: TX - flow control continue")
# send flow control message (send all bytes)
msg = b"\x30\x00\x00".ljust(8, b"\x00")
self.can_tx_queue.put(msg)
self._can_client.send([msg])
return
# consecutive rx frame
if rx_data[0] >> 4 == 0x2:
assert self.rx_done == False, "isotp - rx: consecutive frame with no active frame"
@@ -362,19 +396,19 @@ class IsoTpMessage():
delay_ts = rx_data[2] & 0x7F
# scale is 1 milliseconds if first bit == 0, 100 micro seconds if first bit == 1
delay_div = 1000. if rx_data[2] & 0x80 == 0 else 10000.
delay_sec = delay_ts / delay_div
# first frame = 6 bytes, each consecutive frame = 7 bytes
start = 6 + self.tx_idx * 7
count = rx_data[1]
end = start + count * 7 if count > 0 else self.tx_len
tx_msgs = []
for i in range(start, end, 7):
if delay_ts > 0 and i > start:
delay_s = delay_ts / delay_div
if self.debug: print(f"ISO-TP: TX - delay - seconds={delay_s}")
time.sleep(delay_s)
self.tx_idx += 1
# consecutive tx frames
# consecutive tx messages
msg = (bytes([0x20 | (self.tx_idx & 0xF)]) + self.tx_dat[i:i+7]).ljust(8, b"\x00")
self.can_tx_queue.put(msg)
tx_msgs.append(msg)
# send consecutive tx messages
self._can_client.send(tx_msgs, delay=delay_sec)
if end >= self.tx_len:
self.tx_done = True
if self.debug: print(f"ISO-TP: TX - consecutive frame - idx={self.tx_idx} done={self.tx_done}")
@@ -383,8 +417,7 @@ class IsoTpMessage():
if self.debug: print("ISO-TP: TX - flow control wait")
class UdsClient():
def __init__(self, panda, tx_addr: int, rx_addr: int=None, bus: int=0, timeout: float=10, debug: bool=False):
self.panda = panda
def __init__(self, panda, tx_addr: int, rx_addr: int=None, bus: int=0, timeout: float=1, debug: bool=False):
self.bus = bus
self.tx_addr = tx_addr
if rx_addr is None:
@@ -396,54 +429,12 @@ class UdsClient():
self.rx_addr = (tx_addr & 0xFFFF0000) + (tx_addr<<8 & 0xFF00) + (tx_addr>>8 & 0xFF)
else:
raise ValueError("invalid tx_addr: {}".format(tx_addr))
self.can_tx_queue = Queue()
self.can_rx_queue = Queue()
self.timeout = timeout
self.debug = debug
self.can_thread = Thread(target=self._can_thread, args=(self.debug,))
self.can_thread.daemon = True
self.can_thread.start()
def _can_thread(self, debug: bool=False):
try:
while True:
# send
tx_cnt = 0
while tx_cnt < 256 and not self.can_tx_queue.empty():
try:
msg = self.can_tx_queue.get(block=False)
tx_cnt += 1
if debug: print("CAN-TX: {} - {}".format(hex(self.tx_addr), hexlify(msg)))
self.panda.can_send(self.tx_addr, msg, self.bus)
except Empty:
pass
# receive
rx_cnt = 0
while rx_cnt < 4096:
msgs = self.panda.can_recv()
if not msgs:
break
rx_cnt += len(msgs)
for rx_addr, rx_ts, rx_data, rx_bus in msgs:
if rx_bus != self.bus or rx_addr != self.rx_addr or len(rx_data) == 0:
continue
if debug: print("CAN-RX: {} - {}".format(hex(self.rx_addr), hexlify(rx_data)))
self.can_rx_queue.put(rx_data)
finally:
self.panda.close()
self._can_client = CanClient(panda.can_send, panda.can_recv, self.tx_addr, [self.rx_addr], self.bus, debug=self.debug)
# generic uds request
def _uds_request(self, service_type: SERVICE_TYPE, subfunction: int=None, data: bytes=None) -> bytes:
# throw away any stale data
while not self.can_rx_queue.empty():
try:
self.can_rx_queue.get(block=False)
except Empty:
pass
req = bytes([service_type])
if subfunction is not None:
req += bytes([subfunction])
@@ -451,7 +442,7 @@ class UdsClient():
req += data
# send request, wait for response
isotp_msg = IsoTpMessage(self.can_tx_queue, self.can_rx_queue, self.timeout, self.debug)
isotp_msg = IsoTpMessage(self._can_client, self.timeout, self.debug)
isotp_msg.send(req)
while True:
resp = isotp_msg.recv()