simple joystick example (#1200)
* setup runner * port over joystickd * fix mypy * set accel and steer
This commit is contained in:
parent
a1b95d784e
commit
cbad7f0066
|
@ -0,0 +1,118 @@
|
|||
#!/usr/bin/env python3
|
||||
import time
|
||||
import threading
|
||||
import argparse
|
||||
import numpy as np
|
||||
from inputs import get_gamepad
|
||||
|
||||
from kbhit import KBHit
|
||||
|
||||
from opendbc.car.structs import CarControl
|
||||
from opendbc.car.panda_runner import PandaRunner
|
||||
from opendbc.car.can_definitions import CanData
|
||||
|
||||
class Keyboard:
|
||||
def __init__(self):
|
||||
self.kb = KBHit()
|
||||
self.axis_increment = 0.05 # 5% of full actuation each key press
|
||||
self.axes_map = {'w': 'gb', 's': 'gb',
|
||||
'a': 'steer', 'd': 'steer'}
|
||||
self.axes_values = {'gb': 0., 'steer': 0.}
|
||||
self.axes_order = ['gb', 'steer']
|
||||
self.cancel = False
|
||||
|
||||
def update(self):
|
||||
key = self.kb.getch().lower()
|
||||
print(key)
|
||||
self.cancel = False
|
||||
if key == 'r':
|
||||
self.axes_values = {ax: 0. for ax in self.axes_values}
|
||||
elif key == 'c':
|
||||
self.cancel = True
|
||||
elif key in self.axes_map:
|
||||
axis = self.axes_map[key]
|
||||
incr = self.axis_increment if key in ['w', 'a'] else -self.axis_increment
|
||||
self.axes_values[axis] = float(np.clip(self.axes_values[axis] + incr, -1, 1))
|
||||
else:
|
||||
return False
|
||||
return True
|
||||
|
||||
class Joystick:
|
||||
def __init__(self, gamepad=False):
|
||||
# TODO: find a way to get this from API, perhaps "inputs" doesn't support it
|
||||
if gamepad:
|
||||
self.cancel_button = 'BTN_NORTH' # (BTN_NORTH=X, ABS_RZ=Right Trigger)
|
||||
accel_axis = 'ABS_Y'
|
||||
steer_axis = 'ABS_RX'
|
||||
else:
|
||||
self.cancel_button = 'BTN_TRIGGER'
|
||||
accel_axis = 'ABS_Y'
|
||||
steer_axis = 'ABS_RX'
|
||||
self.min_axis_value = {accel_axis: 0., steer_axis: 0.}
|
||||
self.max_axis_value = {accel_axis: 255., steer_axis: 255.}
|
||||
self.axes_values = {accel_axis: 0., steer_axis: 0.}
|
||||
self.axes_order = [accel_axis, steer_axis]
|
||||
self.cancel = False
|
||||
|
||||
def update(self):
|
||||
joystick_event = get_gamepad()[0]
|
||||
event = (joystick_event.code, joystick_event.state)
|
||||
if event[0] == self.cancel_button:
|
||||
if event[1] == 1:
|
||||
self.cancel = True
|
||||
elif event[1] == 0: # state 0 is falling edge
|
||||
self.cancel = False
|
||||
elif event[0] in self.axes_values:
|
||||
self.max_axis_value[event[0]] = max(event[1], self.max_axis_value[event[0]])
|
||||
self.min_axis_value[event[0]] = min(event[1], self.min_axis_value[event[0]])
|
||||
|
||||
norm = -float(np.interp(event[1], [self.min_axis_value[event[0]], self.max_axis_value[event[0]]], [-1., 1.]))
|
||||
self.axes_values[event[0]] = norm if abs(norm) > 0.05 else 0. # center can be noisy, deadzone of 5%
|
||||
else:
|
||||
return False
|
||||
return True
|
||||
|
||||
def joystick_thread(joystick):
|
||||
while True:
|
||||
joystick.update()
|
||||
|
||||
def main(joystick):
|
||||
threading.Thread(target=joystick_thread, args=(joystick,), daemon=True).start()
|
||||
with PandaRunner() as (p, CI):
|
||||
CC = CarControl(enabled=False)
|
||||
while True:
|
||||
cd = [CanData(addr, dat, bus) for addr, dat, bus in p.can_recv()]
|
||||
CI.update([0, cd])
|
||||
|
||||
CC.actuators.accel = float(4.0*np.clip(joystick.axes_values['gb'], -1, 1))
|
||||
CC.actuators.steer = float(np.clip(joystick.axes_values['steer'], -1, 1))
|
||||
|
||||
from pprint import pprint
|
||||
pprint(CC)
|
||||
|
||||
_, can_sends = CI.apply(CC)
|
||||
p.can_send_many(can_sends, timeout=1000)
|
||||
|
||||
# 100Hz
|
||||
time.sleep(0.01)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(description='Test the car interface with a joystick. Uses keyboard by default.',
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
|
||||
parser.add_argument('--mode', choices=['keyboard', 'gamepad', 'joystick'], default='keyboard')
|
||||
args = parser.parse_args()
|
||||
|
||||
print()
|
||||
joystick: Keyboard | Joystick
|
||||
if args.mode == 'keyboard':
|
||||
print('Gas/brake control: `W` and `S` keys')
|
||||
print('Steering control: `A` and `D` keys')
|
||||
print('Buttons')
|
||||
print('- `R`: Resets axes')
|
||||
print('- `C`: Cancel cruise control')
|
||||
joystick = Keyboard()
|
||||
else:
|
||||
joystick = Joystick(gamepad=(args.mode == 'gamepad'))
|
||||
main(joystick)
|
|
@ -0,0 +1,59 @@
|
|||
#!/usr/bin/env python3
|
||||
import sys
|
||||
import termios
|
||||
import atexit
|
||||
from select import select
|
||||
|
||||
STDIN_FD = sys.stdin.fileno()
|
||||
|
||||
class KBHit:
|
||||
def __init__(self) -> None:
|
||||
self.set_kbhit_terminal()
|
||||
|
||||
def set_kbhit_terminal(self) -> None:
|
||||
# Save the terminal settings
|
||||
self.old_term = termios.tcgetattr(STDIN_FD)
|
||||
self.new_term = self.old_term.copy()
|
||||
|
||||
# New terminal setting unbuffered
|
||||
self.new_term[3] &= ~(termios.ICANON | termios.ECHO)
|
||||
termios.tcsetattr(STDIN_FD, termios.TCSAFLUSH, self.new_term)
|
||||
|
||||
# Support normal-terminal reset at exit
|
||||
atexit.register(self.set_normal_term)
|
||||
|
||||
def set_normal_term(self) -> None:
|
||||
termios.tcsetattr(STDIN_FD, termios.TCSAFLUSH, self.old_term)
|
||||
|
||||
@staticmethod
|
||||
def getch() -> str:
|
||||
return sys.stdin.read(1)
|
||||
|
||||
@staticmethod
|
||||
def getarrow() -> int:
|
||||
c = sys.stdin.read(3)[2]
|
||||
vals = [65, 67, 66, 68]
|
||||
return vals.index(ord(c))
|
||||
|
||||
@staticmethod
|
||||
def kbhit():
|
||||
''' Returns True if keyboard character was hit, False otherwise.
|
||||
'''
|
||||
return select([sys.stdin], [], [], 0)[0] != []
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
kb = KBHit()
|
||||
|
||||
print('Hit any key, or ESC to exit')
|
||||
|
||||
while True:
|
||||
|
||||
if kb.kbhit():
|
||||
c = kb.getch()
|
||||
if c == '\x1b': # ESC
|
||||
break
|
||||
print(c)
|
||||
|
||||
kb.set_normal_term()
|
|
@ -0,0 +1,35 @@
|
|||
from contextlib import contextmanager
|
||||
|
||||
from panda import Panda
|
||||
from opendbc.car.car_helpers import get_car
|
||||
from opendbc.car.can_definitions import CanData
|
||||
|
||||
@contextmanager
|
||||
def PandaRunner():
|
||||
p = Panda()
|
||||
|
||||
def _can_recv(wait_for_one: bool = False) -> list[list[CanData]]:
|
||||
recv = p.can_recv()
|
||||
while len(recv) == 0 and wait_for_one:
|
||||
recv = p.can_recv()
|
||||
return [[CanData(addr, dat, bus) for addr, dat, bus in recv], ]
|
||||
|
||||
try:
|
||||
# setup + fingerprinting
|
||||
p.set_safety_mode(Panda.SAFETY_ELM327, 1)
|
||||
CI = get_car(_can_recv, p.can_send_many, p.set_obd, True)
|
||||
print("fingerprinted", CI.CP.carName)
|
||||
assert CI.CP.carFingerprint != "mock", "Unable to identify car. Check connections and ensure car is supported."
|
||||
|
||||
p.set_safety_mode(Panda.SAFETY_ELM327, 1)
|
||||
CI.init(CI.CP, _can_recv, p.can_send_many)
|
||||
p.set_safety_mode(Panda.SAFETY_TOYOTA, CI.CP.safetyConfigs[0].safetyParam)
|
||||
|
||||
yield p, CI
|
||||
finally:
|
||||
p.set_safety_mode(Panda.SAFETY_NOOUTPUT)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
with PandaRunner() as (p, CI):
|
||||
print(p.can_recv())
|
|
@ -33,6 +33,9 @@ docs = [
|
|||
"Jinja2",
|
||||
"natsort",
|
||||
]
|
||||
examples = [
|
||||
"inputs",
|
||||
]
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
addopts = "--ignore=panda/ -Werror --strict-config --strict-markers --durations=10 -n auto"
|
||||
|
|
Loading…
Reference in New Issue